Python pyspark.sql 模块,SparkSession() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用pyspark.sql.SparkSession()

项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def createOrReplaceTempView(self, name):
        """Creates or replaces a local temporary view with this DataFrame.

        The lifetime of this temporary table is tied to the :class:`SparkSession`
        that was used to create this :class:`DataFrame`.

        >>> df.createOrReplaceTempView("people")
        >>> df2 = df.filter(df.age > 3)
        >>> df2.createOrReplaceTempView("people")
        >>> df3 = spark.sql("select * from people")
        >>> sorted(df3.collect()) == sorted(df2.collect())
        True
        >>> spark.catalog.dropTempView("people")

        """
        self._jdf.createOrReplaceTempView(name)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def createTempView(self, name):
        """Creates a local temporary view with this DataFrame.

        The lifetime of this temporary table is tied to the :class:`SparkSession`
        that was used to create this :class:`DataFrame`.
        throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the
        catalog.

        >>> df.createTempView("people")
        >>> df2 = spark.sql("select * from people")
        >>> sorted(df.collect()) == sorted(df2.collect())
        True
        >>> df.createTempView("people")  # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        ...
        AnalysisException: u"Temporary table 'people' already exists;"
        >>> spark.catalog.dropTempView("people")

        """
        self._jdf.createTempView(name)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def schema(self, schema):
        """Specifies the input schema.

        Some data sources (e.g. JSON) can infer the input schema automatically from data.
        By specifying the schema here, the underlying data source can skip the schema
        inference step, and thus speed up data loading.

        :param schema: a :class:`pyspark.sql.types.StructType` object
        """
        from pyspark.sql import SparkSession
        if not isinstance(schema, StructType):
            raise TypeError("schema should be StructType")
        spark = SparkSession.builder.getOrCreate()
        jschema = spark._jsparkSession.parseDataType(schema.json())
        self._jreader = self._jreader.schema(jschema)
        return self
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def schema(self, schema):
        """Specifies the input schema.

        Some data sources (e.g. JSON) can infer the input schema automatically from data.
        By specifying the schema here, the underlying data source can skip the schema
        inference step, and thus speed up data loading.

        .. note:: Experimental.

        :param schema: a :class:`pyspark.sql.types.StructType` object

        >>> s = spark.readStream.schema(sdf_schema)
        """
        from pyspark.sql import SparkSession
        if not isinstance(schema, StructType):
            raise TypeError("schema should be StructType")
        spark = SparkSession.builder.getOrCreate()
        jschema = spark._jsparkSession.parseDataType(schema.json())
        self._jreader = self._jreader.schema(jschema)
        return self
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def process(time, rdd):
        print("========= %s =========" % str(time))

        try:
            # Get the singleton instance of SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

            # Convert RDD[String] to RDD[Row] to DataFrame
            rowRdd = rdd.map(lambda w: Row(word=w))
            wordsDataFrame = spark.createDataFrame(rowRdd)

            # Creates a temporary view using the DataFrame.
            wordsDataFrame.createOrReplaceTempView("words")

            # Do word count on table using SQL and print it
            wordCountsDataFrame = \
                spark.sql("select word, count(*) as total from words group by word")
            wordCountsDataFrame.show()
        except:
            pass
项目:implicit    作者:benfred    | 项目源码 | 文件源码
def benchmark_spark(ratings, factors, iterations=5):
    conf = (SparkConf()
            .setAppName("implicit_benchmark")
            .setMaster('local[*]')
            .set('spark.driver.memory', '16G')
            )
    context = SparkContext(conf=conf)
    spark = SparkSession(context)

    times = {}
    try:
        ratings = convert_sparse_to_dataframe(spark, context, ratings)

        for rank in factors:
            als = ALS(rank=rank, maxIter=iterations,
                      alpha=1, implicitPrefs=True,
                      userCol="row", itemCol="col", ratingCol="data")
            start = time.time()
            als.fit(ratings)
            elapsed = time.time() - start
            times[rank] = elapsed / iterations
            print("spark. factors=%i took %.3f" % (rank, elapsed/iterations))
    finally:
        spark.stop()

    return times
项目:provectus-final-project    作者:eds-uga    | 项目源码 | 文件源码
def main():
    spark = SparkSession \
       .builder \
       .appName("RandomForest") \
       .config("spark.executor.heartbeatInterval","60s")\
       .getOrCreate()

    sc = spark.sparkContext
    sqlContext = SQLContext(sc)

    sc.setLogLevel("INFO")

    train_df = spark.read.parquet(sys.argv[1])
    #Persist the data in memory and disk
    train_df.persist(StorageLevel(True, True, False, False, 1))

    rfc = RandomForestClassifier(maxDepth=8, maxBins=2400000, numTrees=128,impurity="gini")
    rfc_model = rfc.fit(train_df)
    rfc_model.save(sys.argv[2] + "rfc_model")
项目:Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Create a SparkSession (the config bit is only for Windows!)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def setUp(self):
        self.sc = SparkContext('local[4]', "MLlib tests")
        self.spark = SparkSession(self.sc)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def setUpClass(cls):
        PySparkTestCase.setUpClass()
        cls.spark = SparkSession(cls.sc)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _test():
    import doctest
    from pyspark.context import SparkContext
    from pyspark.sql import Row, SQLContext, SparkSession
    import pyspark.sql.dataframe
    from pyspark.sql.functions import from_unixtime
    globs = pyspark.sql.dataframe.__dict__.copy()
    sc = SparkContext('local[4]', 'PythonTest')
    globs['sc'] = sc
    globs['sqlContext'] = SQLContext(sc)
    globs['spark'] = SparkSession(sc)
    globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\
        .toDF(StructType([StructField('age', IntegerType()),
                          StructField('name', StringType())]))
    globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
    globs['df3'] = sc.parallelize([Row(name='Alice', age=2),
                                   Row(name='Bob', age=5)]).toDF()
    globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
                                   Row(name='Bob', age=5, height=None),
                                   Row(name='Tom', age=None, height=None),
                                   Row(name=None, age=None, height=None)]).toDF()
    globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846),
                                   Row(name='Bob', time=1479442946)]).toDF()

    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.dataframe, globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
    globs['sc'].stop()
    if failure_count:
        exit(-1)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _test():
    import doctest
    import os
    import tempfile
    import py4j
    from pyspark.context import SparkContext
    from pyspark.sql import SparkSession, Row
    import pyspark.sql.readwriter

    os.chdir(os.environ["SPARK_HOME"])

    globs = pyspark.sql.readwriter.__dict__.copy()
    sc = SparkContext('local[4]', 'PythonTest')
    try:
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    except py4j.protocol.Py4JError:
        spark = SparkSession(sc)

    globs['tempfile'] = tempfile
    globs['os'] = os
    globs['sc'] = sc
    globs['spark'] = spark
    globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.readwriter, globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
    sc.stop()
    if failure_count:
        exit(-1)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def queryName(self, queryName):
        """Specifies the name of the :class:`StreamingQuery` that can be started with
        :func:`start`. This name must be unique among all the currently active queries
        in the associated SparkSession.

        .. note:: Experimental.

        :param queryName: unique name for the query

        >>> writer = sdf.writeStream.queryName('streaming_query')
        """
        if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
            raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
        self._jwrite = self._jwrite.queryName(queryName)
        return self
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _test():
    import doctest
    import os
    import tempfile
    from pyspark.sql import Row, SparkSession, SQLContext
    import pyspark.sql.streaming

    os.chdir(os.environ["SPARK_HOME"])

    globs = pyspark.sql.streaming.__dict__.copy()
    try:
        spark = SparkSession.builder.getOrCreate()
    except py4j.protocol.Py4JError:
        spark = SparkSession(sc)

    globs['tempfile'] = tempfile
    globs['os'] = os
    globs['spark'] = spark
    globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
    globs['sdf'] = \
        spark.readStream.format('text').load('python/test_support/sql/streaming')
    globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
    globs['df'] = \
        globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')

    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.streaming, globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
    globs['spark'].stop()

    if failure_count:
        exit(-1)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']
项目:spylon    作者:maxpoint    | 项目源码 | 文件源码
def spark_session(self, application_name):
        sc = self.spark_context(application_name)
        from pyspark.sql import SparkSession
        return SparkSession(sc)
项目:python_mozetl    作者:mozilla    | 项目源码 | 文件源码
def main(date, bucket, prefix, num_clusters, num_donors, kernel_bandwidth, num_pdf_points):
    spark = (SparkSession
             .builder
             .appName("taar_similarity")
             .enableHiveSupport()
             .getOrCreate())

    if num_donors < 100:
        logger.warn("Less than 100 donors were requested.", extra={"donors": num_donors})
        num_donors = 100

    logger.info("Loading the AMO whitelist...")
    whitelist = load_amo_external_whitelist()

    logger.info("Computing the list of donors...")

    # Compute the donors clusters and the LR curves.
    cluster_ids, donors_df = get_donors(spark, num_clusters, num_donors, whitelist)
    lr_curves = get_lr_curves(spark, donors_df, cluster_ids, kernel_bandwidth,
                              num_pdf_points)

    # Store them.
    donors = format_donors_dictionary(donors_df)
    store_json_to_s3(json.dumps(donors, indent=2), 'donors',
                     date, prefix, bucket)
    store_json_to_s3(json.dumps(lr_curves, indent=2), 'lr_curves',
                     date, prefix, bucket)
    spark.stop()
项目:spark-deep-learning    作者:databricks    | 项目源码 | 文件源码
def readImages(self, path, recursive=False, numPartitions=-1,
                   dropImageFailures=False, sampleRatio=1.0, seed=0):
        """
        Reads the directory of images from the local or remote source.

        .. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag,
            there may be a race condition where one job overwrites the hadoop configs of another.

        .. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
            potentially non-deterministic.

        :param str path: Path to the image directory.
        :param bool recursive: Recursive search flag.
        :param int numPartitions: Number of DataFrame partitions.
        :param bool dropImageFailures: Drop the files that are not valid images.
        :param float sampleRatio: Fraction of the images loaded.
        :param int seed: Random number seed.
        :return: a :class:`DataFrame` with a single column of "images",
               see ImageSchema for details.

        >>> df = ImageSchema.readImages('python/test_support/image/kittens', recursive=True)
        >>> df.count()
        4

        .. versionadded:: 2.3.0
        """

        ctx = SparkContext._active_spark_context
        spark = SparkSession(ctx)
        image_schema = ctx._jvm.org.apache.spark.ml.image.ImageSchema
        jsession = spark._jsparkSession
        jresult = image_schema.readImages(path, jsession, recursive, numPartitions,
                                          dropImageFailures, float(sampleRatio), seed)
        return DataFrame(jresult, spark._wrapped)
项目:aws-glue-libs    作者:awslabs    | 项目源码 | 文件源码
def __init__(self, sparkContext, **options):
        super(GlueContext, self).__init__(sparkContext)
        register(sparkContext)
        self._glue_scala_context = self._get_glue_scala_context(**options)
        self.create_dynamic_frame = DynamicFrameReader(self)
        self.write_dynamic_frame = DynamicFrameWriter(self)
        self.spark_session = SparkSession(sparkContext, self._glue_scala_context.getSparkSession())
项目:spark_python    作者:xieenze    | 项目源码 | 文件源码
def basic_df_example(spark):

    # $example on:create_df$
    # spark is an existing SparkSession
    df = spark.read.json("/home/xieenze/Desktop/spark/testweet.json")
    # Displays the content of the DataFrame to stdout
    a=df.collect()
    df.createGlobalTempView("people")

    # Global temporary view is tied to a system preserved database `global_temp`
    b=spark.sql("SELECT * FROM global_temp.people").collect()

    return a,b;
项目:spark_python    作者:xieenze    | 项目源码 | 文件源码
def getspark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark
项目:goal    作者:victorskl    | 项目源码 | 文件源码
def get_content():
    spark = SparkSession \
        .builder \
        .appName("Crime Detection") \
        .config("cloudant.host", "115.146.94.41:9584/") \
        .config("cloudant.username", "cadmin") \
        .config("cloudant.password", "qwerty8888") \
        .config("cloudant.protocol", "http") \
        .config('jsonstore.rdd.partitions', 32) \
        .getOrCreate()

    spark.sql(" CREATE TEMPORARY VIEW tweetTmpView USING com.cloudant.spark OPTIONS ( database 'tweet_raw_crimeresult')")

    twt = spark.sql('SELECT text,entities,geo,created_at,user FROM tweetTmpView')
    # hashtag = spark.sql('SELECT hashtags FROM tweetTmpView')
    # twt.printSchema()

    # print 'Total # of rows in tweet_raw: ' + str(twt.count())
    content = []
    for tw in twt.collect():
        content_dict = {}
        if (len(tw.entities.hashtags) == 0):
            content_dict['twitter_content'] = tw.text.encode('utf-8')
            content_dict['geo'] = tw.geo
            content_dict['created_at'] = tw.created_at
            content_dict['location'] = tw.user.location
        else:
            content_dict['twitter_content'] = tw.text.encode('utf-8')
            content_dict['hashtag'] = tw.entities.hashtags[0].text
            content_dict['geo'] = tw.geo
            content_dict['created_at'] = tw.created_at
            content_dict['location'] = tw.user.location

        content.append(content_dict)
    return content
项目:goal    作者:victorskl    | 项目源码 | 文件源码
def main():
    spark = SparkSession \
        .builder \
        .appName(app_name) \
        .config('cloudant.host', db_host) \
        .config('cloudant.username', db_user) \
        .config('cloudant.password', db_password) \
        .config('cloudant.protocol', 'http') \
        .config('jsonstore.rdd.partitions', part_size) \
        .getOrCreate()

    tweet = spark.read.load(db_source, 'com.cloudant.spark')
    tweet.cache()
    # tweet.printSchema()

    if not has_column(tweet, tag_name):

        filtered_tweet = tweet.filter(tweet.text.rlike(keyword))

        sentiment_udf = udf(sentiment, StringType())

        df = filtered_tweet.withColumn(tag_name, sentiment_udf(filtered_tweet['text']))

        # df.select('_id', 'text', 'tag').show()
        # print(df.count())

        df.write.format('com.cloudant.spark').save(db_target)
项目:goal    作者:victorskl    | 项目源码 | 文件源码
def get_content():
    spark = SparkSession \
        .builder \
        .appName("Crime Detection") \
        .config("cloudant.host", "115.146.94.41:5000") \
        .config("cloudant.username", "cadmin") \
        .config("cloudant.password", "qwerty8888") \
        .config("cloudant.protocol", "http") \
        .config('jsonstore.rdd.partitions', 32) \
        .getOrCreate()

    spark.sql(" CREATE TEMPORARY VIEW tweetTmpView USING com.cloudant.spark OPTIONS ( database 'tweet_raw_trump')")

    twt = spark.sql('SELECT text,entities,geo,created_at FROM tweetTmpView')
    # hashtag = spark.sql('SELECT hashtags FROM tweetTmpView')
    # twt.printSchema()

    # print 'Total # of rows in tweet_raw: ' + str(twt.count())
    content = []
    for tw in twt.collect():
        content_dict = {}
        if (len(tw.entities.hashtags) == 0):
            content_dict['twitter_content'] = tw.text.encode('utf-8')
            content_dict['geo'] = tw.geo
            content_dict['created_at'] = tw.created_at
        else:
            content_dict['twitter_content'] = tw.text.encode('utf-8')
            content_dict['hashtag'] = tw.entities.hashtags[0].text
            content_dict['geo'] = tw.geo
            content_dict['created_at'] = tw.created_at

        content.append(content_dict)
    return content
项目:provectus-final-project    作者:eds-uga    | 项目源码 | 文件源码
def main():
    spark = SparkSession \
       .builder \
       .appName("RandomForest") \
       .config("spark.executor.heartbeatInterval","60s")\
       .getOrCreate()

    sc = spark.sparkContext
    sqlContext = SQLContext(sc)

    sc.setLogLevel("INFO")

    # Loading the test data
    df_test= spark.read.parquet(sys.argv[1])

    df_test, df_discard = df_test.randomSplit([0.2, 0.8])

    # Load the model
    rf_model=RandomForestClassificationModel.load(sys.argv[2])

    # Make the predictions
    predictions = rf_model.transform(df_test)

    #predictionsRDD=predictions.rdd

    #predictionsRDD.saveAsTextFile(sys.argv[3]+"output.text")

    evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
    accuracy = evaluator_acc.evaluate(predictions)

    print "accuracy *******************"
    print accuracy

    evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")

    print "precision *******************"
    print evaluator_pre.evaluate(predictions)

    print "recall **********************"
    print MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall").evaluate(predictions)
项目:kafka-spark-influx-csv-analysis    作者:bwsw    | 项目源码 | 文件源码
def tearDown(self):
        self.__class__.writer.client.drop_database(self.__class__.influx_options["database"])

    # Temporary commented because we don't understand why this test failed
    # def test_write_rdd_to_influx(self):
    #     struct = ["min_packet_size", "max_traffic"]
    #     config = Config(CONFIG_PATH)
    #
    #     self.__class__.influx_options = config.content["output"]["options"]["influx"]
    #
    #     client = InfluxDBClientMock(self.__class__.influx_options["host"], self.__class__.influx_options["port"],
    #                             self.__class__.influx_options["username"],
    #                             self.__class__.influx_options["password"],
    #                             self.__class__.influx_options["database"])
    #
    #     self.__class__.writer = InfluxWriter(client, self.__class__.influx_options["database"],
    #                                          self.__class__.influx_options["measurement"],
    #                                          struct)
    #     spark = SparkSession \
    #         .builder \
    #         .appName("TestInfluxWriter") \
    #         .getOrCreate()
    #
    #     array = [("91.221.61.168", 68, 34816), ("192.168.30.2", 185, 189440),
    #              ("91.226.13.80", 1510, 773120), ("217.69.143.60", 74, 37888)]
    #
    #     sample = spark.sparkContext.parallelize(array).repartition(1)
    #
    #     write_lambda = self.__class__.writer.get_write_lambda()
    #     write_lambda(sample)
    #
    #     result = self.__class__.writer.client.query(
    #         "select * from {0}".format(self.__class__.influx_options["measurement"]))
    #
    #     points = list(result.get_points())
    #
    #     self.assertEqual(len(points), len(array),
    #                      "In {0} measurement should be written {1} points".format(
    #                          self.__class__.influx_options["measurement"], len(array)))
    #
    #     struct.insert(0, "key")
    #
    #     for p_index, point in enumerate(points):
    #         for s_index, name in enumerate(struct):
    #             self.assertEqual(point[name], array[p_index][s_index],
    #                              "Point {0} field {1} should has value {2}".format(p_index, name,
    #                                                                                array[p_index][s_index]))
    #
    #     spark.stop()
项目:provectus-final-project    作者:eds-uga    | 项目源码 | 文件源码
def main():
    spark = SparkSession \
        .builder \
        .appName("RandomForest") \
        .config("spark.executor.heartbeatInterval", "60s") \
        .getOrCreate()

    sc = spark.sparkContext
    sqlContext = SQLContext(sc)

    sc.setLogLevel("INFO")

    # Loading the test data
    df_test = spark.read.parquet(sys.argv[1])

    df_test, df_train = df_test.randomSplit([0.3, 0.7])
    df_train_indexed=df_train.selectExpr("label as indexedLabel","features as indexedFeatures")
    df_test_indexed=df_test.selectExpr("label as indexedLabel","features as indexedFeatures")

    # # Load the model
    # rf_model = RandomForestClassificationModel.load(sys.argv[2])
    #
    # # Make the predictions
    # predictions = rf_model.transform(df_test)
    gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=100,maxBins=24000000)
    model=gbt.fit(df_train_indexed)
    predictions = model.transform(df_test_indexed)

    # predictionsRDD=predictions.rdd

    # predictionsRDD.saveAsTextFile(sys.argv[3]+"output.text")

    evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel",
                                                      metricName="accuracy")
    accuracy = evaluator_acc.evaluate(predictions)

    print "accuracy *******************"
    print accuracy

    evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel",
                                                      metricName="weightedPrecision")

    print "precision *******************"
    print evaluator_pre.evaluate(predictions)

    print "recall **********************"
    print MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel",
                                            metricName="weightedRecall").evaluate(predictions)