Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。
Spark Streaming 是 Spark API 核心的扩展,它支持来自各种来源的流处理。
StreamingPro 是一个可扩展、可编程的 Spark Streaming 框架(也包括 Spark,Storm),可以轻松地用于构建流式应用。
StreamingPro 支持以 Spark、Flink 等作为底层分布式计算引擎,通过一套统一的配置文件完成批处理、流式计算与Rest 服务的开发。 特点有:
使用Json描述文件完成流式,批处理的开发,不用写代码。
支持SQL Server,支持XSQL/MLSQL(重点),完成批处理,机器学习,即席查询等功能。
标准化输入输出,支持UDF函数注册,支持自定义模块开发
支持Web化管理Spark应用的启动,监控
如果更细节好处有:
跨版本:StreamingPro可以让你不用任何变更就可以轻易的运行在spark 1.6/2.1/2.2上。
新语法:提供了新的DSl查询语法/Json配置语法
程序的管理工具:提供web界面启动/监控 Spark 程序
功能增强:2.1之后Structured Streaming 不支持kafka 0.8/0.9 ,Structured,此外还有比如spark streaming 支持offset 保存等
简化Spark SQL Server搭建成本:提供rest接口/thrift 接口,支持spark sql server 的负载均衡,自动将driver 注册到zookeeper上
探索更多的吧
如果你使用StreamingPro,那么所有的工作都是在编辑一个Json配置文件。通常一个处理流程,会包含三个概念:
多个输入
多个连续/并行的数据处理
多个输出
StreamingPro会通过’compositor’的概念来描述他们,你可以理解为一个处理单元。一个典型的输入compositor如下:
{ "name": "batch.sources", "params": [ { "path": "file:///tmp/hdfsfile/abc.txt", "format": "json", "outputTable": "test" }, { "path": "file:///tmp/parquet/", "format": "parquet", "outputTable": "test2" } ] }
batch.sources 就是一个compositor的名字。 这个compositor 把一个本地磁盘的文件映射成了一张表,并且告知系统,abc.txt里的内容 是json格式的。这样,我们在后续的compositor模块就可以使用这个test表名了。通常,StreamingPro希望整个处理流程, 也就是不同的compositor都采用表来进行衔接。
batch.sources
test
StreamingPro不仅仅能做批处理,还能做流式,流式支持Spark Streaming,Structured Streaming。依然以输入compositor为例,假设 我们使用的是Structured Streaming,则可以如下配置。
{ "name": "ss.sources", "params": [ { "format": "kafka9", "outputTable": "test", "kafka.bootstrap.servers": "127.0.0.1:9092", "topics": "test", "path": "-" }, { "format": "com.databricks.spark.csv", "outputTable": "sample", "header": "true", "path": "/Users/allwefantasy/streamingpro/sample.csv" } ] }
第一个表示我们对接的数据源是kafka 0.9,我们把Kafka的数据映射成表test。 因为我们可能还需要一些元数据,比如ip和城市的映射关系, 所以我们还可以配置一些其他的非流式的数据源,我们这里配置了一个smaple.csv文件,并且命名为表sample。
如果你使用的是kafka >= 1.0,则 topics 参数需要换成’subscribe’,并且使用时可能需要对内容做下转换,类似:
select CAST(key AS STRING) as k, CAST(value AS STRING) as v from test
启动时,你需要把-streaming.platform 设置为 ss。
ss
如果我们的输入输出都是Hive的话,可能就不需要batch.sources/batch.outputs 等组件了,通常一个batch.sql就够了。比如:
"without-sources-job": { "desc": "-", "strategy": "spark", "algorithm": [], "ref": [], "compositor": [ { "name": "batch.sql", "params": [ { "sql": "select * from hiveTable", "outputTableName": "puarquetTable" } ] }, { "name": "batch.outputs", "params": [ { "format": "parquet", "inputTableName": "puarquetTable", "path": "/tmp/wow", "mode": "Overwrite" } ] } ], "configParams": { } }
在批处理里,batch.sources/batch.outputs 都是可有可无的,但是对于流式程序,stream.sources/stream.outputs/ss.sources/ss.outputs 则是必须的。
后面三个参数值得进一步说明:
假设我们定义了两个数据源,firstSource,secondSource,描述如下:
{ "name": "batch.sources", "params": [ { "name":"firstSource", "path": "file:///tmp/sample_article.txt", "format": "com.databricks.spark.csv", "outputTable": "article", "header":true }, { "name":"secondSource", "path": "file:///tmp/sample_article2.txt", "format": "com.databricks.spark.csv", "outputTable": "article2", "header":true } ] }
我们希望path不是固定的,而是启动时候决定的,这个时候,我们可以在启动脚本中使用-streaming.sql.source.[name].[参数] 来完成这个需求。 比如:
-streaming.sql.source.firstSource.path file:///tmp/wow.txt
这个时候,streamingpro启动的时候会动态将path 替换成你要的。包括outputTable等都是可以替换的。
有时候我们需要定时执行一个任务,而sql语句也是动态变化的,具体如下:
{ "name": "batch.sql", "params": [ { "sql": "select * from test where hp_time=:today", "outputTableName": "finalOutputTable" } ] },
这个时候我们在启动streamingpro的时候,通过参数:
-streaming.sql.params.today "2017"
动态替换 sql语句里的:today