流数据服务(Streaming Service)以流处理模型作为服务来源,其中指定了运行服务所需的信息。
流数据处理流程包含:

依据上述处理流程,流数据的处理模型包含四个部分:Receiver(接收器)、Filter(过滤器)、Mapper(转换器)和Sender(发送器)。每个部分作为一个节点,可以进行连接和合并,构建成实时数据处理流 Stream。除了处理流 Stream 以外,还有一些辅助参数作为整个服务的运行条件,一并存储在启动参数类型 Startup 中。处理模型如下图:

配置参数
流处理模型采用 JSON 格式定义,您可以参考下文介绍的参数及相应的 JSON 示例,编写一个流处理模型文件并发布为流数据服务。您也可以使用流处理模型编辑器构建模型,查看参数说明即可。
SparkParameter
用于设置 Spark Streaming 的运行参数。包括:
- checkPointDir:设置 Streaming 的CheckPoint功能的保存目录。String 类型。
- interval:int 类型。设置 Streaming 运行的间隔时间,单位为毫秒。
Stream
Stream 中包含了实时数据处理运行流的参数。
- nodeDic:存放运行的节点的词典。
- StreamNode:运行流节点基类,记录节点的基本信息如name、source和description,以及最重要的节点运行的先后顺序。采用prevNodes记录当前节点的前序节点列表,nextNodes记录当前节点后序节点列表。 prevNodes 和 nextNodes 的值是一个数组,数组元素是节点的名称,即每个节点可以有多个前序节点,也可以有多个后序节点。对于 Receiver 节点,prevNodes 为空;对于Sender 节点,nextNodes 为空。 如果某个节点有多个前序节点,那么,这些前序节点的 metadata 必须相同,否则,执行会报错。
Receiver
继承自StreamNode,作为流数据处理的入口,接收各种来源的数据,包括Socket、WebSocket、Http、文件系统等。Receiver中需要设置接收信息的元数据,即metadata。Receiver节点包括三个部分组成:自身的描述信息如name、source等;消息的元数据metadata;消息的读取格式reader。
流数据服务支持以下接收方式:
SocketReceiver:继承自Receiver,接收Socket消息的节点。需指定的参数有:
ipAddress——String 类型。接收的Socket服务的IP地址
port——int 类型,接收的Socket服务的端口号
示例:
{
"ipAddress" : "127.0.0.1",
"port" : 9527,
"name" : "socketReceiver",
"source" : "Socket Receiver",
"description" : "Receive some message from socketServer",
"prevNodes" : [],
"nextNodes" : [],
"className": "com.supermap.bdt.streaming.receiver.SocketReceiver "
}
MultiSocketReceiver:继承自Receiver,同时接收多个Socket消息的节点,接收的消息内容必须是相同的。需指定的参数有:
servers——Array[String] 类型。需要接收的多个服务地址,每个数组对象为一个地址,地址与端口用冒号隔开。
示例:
{
"servers": [
"192.168.1.1:9527",
"192.168.1.1:9528",
"192.168.1.2:9527"
],
"name": "multiSocketReceiver",
"source": "MultiSource Socket Receiver",
"description": "Receive message from multi socket server",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver"
}
SocketServerReceiver:继承自Receiver,Socket服务端接收节点,用于作为服务端接收其他Socket客户的发送的消息。需指定的参数有:
port:int型。启动的Socket服务端监听端口。
{
"port": 9527,
"name": "socketServerReceiver",
"source": "SocketServer Receiver",
"description": "Receive message from socket client",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver"
}
WebSocketReceiver:继承自Receiver,接收WebSocket消息的节点。需指定的参数有:
url:String 类型。WebSocket服务地址。
{
"url": "ws://192.168.1.1:9527/websocket ",
"name": "webSocketReceiver",
"source": "WebSocket Receiver",
"description": "Receive message from websocket server",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver"
}
TextFileReceiver:继承自Receiver,监控指定目录,读取新增文件的内容。需指定的参数有:
directoryPath:监控的文件目录,如HDFS目录hdfs:///data/;Linux系统中的目录 /user/share/data;Windows系统中的目录C:/data。
{
"directoryPath": "'hdfs:///data/'",
"name": "textFileReceiver",
"source": "Text File Receiver",
"description": "Listen new file in folder",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.TextFileReceiver"
}
SingleTextFileReceiver:单文本文件接收器,继承自Receiver,根据设置读取监控文件的内容,支持读取 Json、GeoJSON 和 CSV格式的文件。需指定的参数有:
readInterva:读取时间的间隔。
rowsOneTime:每次读取的行数。
filePath:需要发送的文件的路径,注意,此处应填写绝对路径。
{
"version": 9000,
"sparkParameter": {
"checkPointDir": "tmp",
"interval": 5000
},
"stream": {
"nodeDic": {
"TextFileReceiver": {
"filePath": "G:\\QQRev\\test.json",
"readInterva": 1000,
"rowsOneTime": 100,
"reader": {
"isJsonArray": false,
"arrayExpression": "",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
},
"metadata": {
"title": "",
"epsg": 3857,
"fieldInfos": [
{
"name": "X",
"source": "lon",
"nType": "DOUBLE"
},
{
"name": "Y",
"source": "lat",
"nType": "DOUBLE"
},
{
"name": "mbbh",
"source": "mbbh",
"nType": "TEXT"
}
],
"featureType": "POINT",
"idFieldName": "mbbh",
"dateTimeFormat": "yyyy-MM-dd HH:mm:ss"
},
"name": "TextFileReceiver",
"caption": "",
"description": "",
"prevNodes": [],
"nextNodes": [
"ConsoleSender"
],
"className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver"
},
"ConsoleSender": {
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "ConsoleSender",
"caption": "",
"description": "",
"prevNodes": [
"TextFileReceiver"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.ConsoleSender"
}
}
}
}
KafkaReceiver:继承自Receiver,接收kafka消息的节点。需指定的参数有:
- servers——String 类型。服务器列表,如果有多个服务器之间用逗号分隔,端口用冒号“:”分隔
- topics——Array[String] 类型。kafka topic 数组
- groupid—— String 类型。kafka 的 groupID
- offset——String类型。kafka的读取位置,默认为latest,即只读取最新消息
{ "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092",
"topics": [
"topic1",
"topic2"
],
"groupid": "groupId",
"offset": "latest",
"name": "kafkaReceiver",
"source": "Kafka Receiver",
"description": "Receive message from Kafka",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.KafkaReceiver"
}
HttpReceiver: 继承自Receiver,接收 HTTP 的消息节点,目前只支持HTTP的Get方法。
url:String类型。Http服务地址。
{
"url": "https://api.wheretheiss.at/v1/satellites/25544",
"name": "httpReceiver",
"source": "HTTP Receiver",
"description": "Get message from web",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
}
JMSReceiver:继承自Receiver,接收JMS标准协议消息的节点,用于接收ActiveMQ、RabbitMQ等消息中间件的消息。
url——JMS消息服务地址
port——int类型。消息服务端口
queueName ——String类型。 消息队列名称
jdniName——String类型。对应消息中间件的JDNI名称,需要到中间件官网查询
username——String类型。用户名
password——String类型。密码
{
"url": "192.168.1.1",
"port": 9527,
"queueName": "data",
"jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
"userName": "user",
"password": "password",
"name": "jmsReceiver",
"source": "JMS Receiver",
"description": "Receive message from JMS(Java Message Service) for ActiveMQ",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.JMSReceiver"
}
metadata
metadata 写在 Receiver 参数中,是接收消息的元数据,用于描述消息的格式定义。需指定以下信息:
- title:元数据的名称,用于区分其他元数据。String类型
- featureType:FeatureType类。接收消息转换的地理要素类型,如点POINT、线LINE、面REGION等。
- epsg:int 类型。元数据地理要素的投影EPSG编码。
- fieldInfos:接收消息转换后的字段信息。需指定:
- name:String类型。字段名称,为字段的唯一标识
- source:String类型。字段在原始信息中的位置,决定了从原始信息中的什么位置去解析成为本字段的值。成为本字段的值。如果原始信息为 CSV 格式,source 值为 CSV 中的字段序号,如 "source": "4" 代表了 CSV 数据中的第 5 个字段;如果原始数据为 json 格式的,那么 source 值为 json 中键值对的键。
- nType:FieldType 类型。字段的类型,如:字符型 TEXT、双精度浮点型 DOUBLE、整型 INT 等。
reader
接收的消息的内容格式,包括CSV格式(CSVFormatter)、JSON格式(JsonFormatter)或者GeoJSON格式(GeoJsonFormatter)。
CSVFormatter:表示接收的消息的内容格式为CSV格式。需指定:
separator:指定分隔符,默认为逗号
"reader": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
}
JsonFormatter:接收的消息内容格式为 JSON。示例如下:
"reader": {
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
}
GeoJsonFormatter:接收的消息内容格式为 GeoJSON。示例如下:
"reader": {
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
}
Filter
继承于StreamNode,用于过滤当前数据,进行数据的清洗与整理。
逻辑运算式过滤
String filter——过滤器内容,为一个逻辑运算式,将会保留该运算式结果为 true 的对象。
如果需要获取字段值进行逻辑运算,使用关键词[],如[ID] > 10。多个运算式直接可以使用&&(与)或者 ||(或)进行连接,并使用括号()进行优先顺序调整,如[ID] > 10 && ([X] >= 10 || [Y] <= 65.32)。注意,使用关键词IN、MATCHES、EXISTS、ISNULL与其他运算式一起进行运算时,必须使用括号包括起来,如 ([ID] IN 1,3,5,7,9) && [X] > 100。
表1 filter参数支持的逻辑运算符列表
运算符 |
描述 |
| == | 等于 (==) 该运算符保留属性值等于指定值的对象。例如,[ID] == 3。 注意:double类型慎用,对比精度为10E-10. |
| != | 不等于 (!=) 该运算符保留属性值不等于指定值的对象。例如,[Name] != “A”。 注意:double类型慎用,对比精度为10E-10. |
| > | 大于 (>) 该运算符保留属性值大于指定值的对象。例如,[Speed] > 50 |
| >= | 大于或等于 (>=) 该运算符保留属性值大于或等于指定值的对象。例如,[Speed] >= 50 |
| < | 小于 (<) 该运算符保留属性值小于指定值的对象。例如,[X] < 10.231 |
| <= | 小于或等于 (<=) 该运算符保留属性值小于或等于指定值的对象。例如,[Y] <= 40 |
| IN | IN 在指定列表中 当在逗号分隔的值列表中存在指定字段的值时,该运算符保留对象。例如,[Code] IN HK1,HK3,HK5 |
| MATCHES | MATCHES 正则表达式匹配 当指定字段的值与正则表达式相匹配时,该运算符保留对象。例如,[Code] MATCHES “^HK[135]” 注意:需要匹配的正则表达式需要用“”引号包含起来 |
| EXISTS | EXISTS 字段是否存在 当已接收的事件方案中存在指定字段时,该运算符保留对象。例如,EXISTS [X]。 |
| ISNULL | ISNULL 是否为空 当指定字段包含空值时,该运算符保留对象。例如, [X] ISNULL。 |
重要说明:filter 指定的逻辑运算式,其中使用的字符串必须使用双引号(“”)或者单引号(‘’)将字符串扩起来,并且双引号(“”)和单引号(‘’)本身要使用\进行转义,例如:字符串“string”在表达式中要写成:\”string\”或者\’string\’。另外,对于 MATCHES 正则表达式匹配,其中用于匹配的字符“\\”需要对每一个“\”进行转义,所以“\\”字符要写成“\\\\”。示例:
{
"filter": "([X] > [Y] && [X] > 20) || ([ID] IN 1,2,3,4,5)",
"name": "Filter",
"caption": "Attribute Filter",
"description": "Filter feature by expression",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.filter.FeatureFilter"
}
地理过滤
地理过滤是通过空间位置关系过滤地理对象的过滤方式。
- connection:用于地理过滤对象的来源。
- String type——数据源类型。
- Array[DsInfo] Info——数据源连接信息。
- String server:数据服务器
- Array[String] datasetNames:用于地理过滤的数据集名称
- mode:目标对象与地理过滤对象的空间关系。例如:inside 表示,处于地理过滤对象内部的目标对象为过滤结果。
示例:
"geoFilter": {
"connection": {
"type": "udb",
"info": [
{
"server": "Z:\\airport.udb",,
"datasetNames": [
"airports_40"
] } ]
},
"mode": "inside",
"name": "geoFilter",
"caption": "GeoFencing Filter",
"description": "Filter feature with geofencing",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.filter.GeoFilter"
}
Mapper
继承于StreamNode,用于建立字段映射以及对字段进行管理,主要包括:字段映射、添加字段、删除字段、字段运算以及地理围栏。
添加字段
- insertIndex——int类型。字段添加的位置
- fieldname——String类型。添加的字段名称
- nType——SFieldType类型。添加的字段类型
- expression——String类型。添加字段值可以为已有字段的运算表达式的值:
"insertMapper": {
"insertIndex": 1,
"fieldName": "XX",
"nType": "DOUBLE",
"expression": "[X] * 2",
"name": "insertMapper",
"source": "Insert Field",
"description": "Insert Field by X * 2",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureInsertMapper"
}
删除字段
- deleteFieldNames:删除字段名称数组
"deleteMapper": {
"deleteFieldNames": [
"F1",
"F2"
],
"name": "deleteMapper",
"source": "delete Field",
"description": "delete Field F1和F2 ,
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper"
}
字段映射
- srcToDesIndexPair:源数据中的字段(字段在流中的序号)与名称的对照关系
srcToDesNamePair:字段名称与新名称的对照关系
"mapMaper": {
"srcToDesNamePair": {
"ID": "newID_Name",
"Y": "newY_Name",
"X": "newX_Name"
},
"srcToDesIndexPair": {
"ID": 0,
"Y": 2,
"X": 1
},
"name": "mapMaper",
"source": "Map Fields",
"description": "Map Fields with new name and index",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureMapMapper"
}
字段运算
- fieldName:目标字段名称
- expression:字段的运算表达式
"calculateMapper": {
"fieldName ": Fcal,
"expression": "[X] * 2",
"name": "calculateMapper",
"source": "calculate Field",
"description": "calculate Field by X * 2",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper "
}
对于字段运算,除了支持+、-、x、/等数学运算外,还支持数学函数和字符串处理函数。
函数 |
说明 |
| ABS(value) | 返回参数的绝对值 |
| FLOOR(double) | 返回指定双精度值的下限(接近的最小整数) |
| MAX(value a, value b) | 返回 2 个指定参数值中较大的值 |
| MIN(value a, value b) | 返回 2 个指定参数值中较小的值 |
| ROUND(value) | 返回最接近参数的长整数(假设参数为双精度值) |
| NOW() | 返回当前系统时间 |
| UPPERCASE | 字符串处理函数,返回字符串的大写形式,例如: "\"xyz\".UPPERCASE,表示将字符串“xyz”转换为大写 字母返回,处理结果为“XYZ” |
| LOWERCASE | 字符串处理函数,返回字符串的小写形式,例如: "\"ABC\".UPPERCASE,表示将字符串“ABC”转换为小 写字母返回,处理结果为“abc” |
| REPLACE(string1,string2) | 字符串处理函数,将原字符串中的 string1 部分替换为 string2,例如: "\"ABCxyz\".REPLACE(\"AB\",\"MM\"),表示将字符串 中的 “AB”替换为“MM”,处理结果为:MMCxyz |
SUBSTRING(location1, strcount) |
字符串处理函数,将原字符串中 location1 指定位置开 始,取出 strcount 个字符返回。例如: \"ABCxyz\".SUBSTRING(0, 3),表示将目标字符串从第一 个字符开始取出 3 个字符返回,处理结果为:ABC |
注意:expression 指定的运算表达式,其中使用的字符串必须使用双引号(“”)或者单引号(‘’)将字符串扩起来,并且双引号(“”)和单引号(‘’)本身要使用\ 进行转义,例如:字符串“string”在表达式中要写成:\”string\”或者\’string\’。
地理围栏
- connection:地理围栏对象的来源。
type——String类型。数据源类型
Info——Array[DsInfo]类型。数据源连接信息。
- server:数据服务器
- datasetNames:用Array[String]类型。用于地理围栏的数据集名称
- fenceName——String类型。进入地理围栏对象的Name属性字段。
- fenceID——String类型。进入地理围栏对象的ID字段,也就是唯一标识该对象的字段。
- withinFieldName——String类型。新增字段的字段名称,该字段用于记录当前对象是否在地理围栏内。
- statusFieldName——String类型。新增字段的字段名称,该字段用于记录当前对象的状态是进入地理围栏还是离开地理围栏。
"GeoFenceMapper": {
"connection": {
"type": "udb",
"info": [
{
"server": "Z: \\airport.udb",
"datasetNames": [
"airports_40"
]
}
]
},
"fenceName": "NAME",
"fenceID": "SmID",
"withinFieldName": "geoWithin",
"statusFieldName": "geoStatus",
"name": "GeoFenceMapper",
"source": "地理标记转换",
"description": "",
"prevNodes": [
"SocketReceiver"
],
"nextNodes": [
"GeoJsonSocketSender",
"FenceWithinFilterOut",
"FenceWithinFilterIn"
],
"className": "com.supermap.bdt.streaming.map.GeoTaggerMapper"
}
静态资源扩展
- csvFile:要连接的文件目录
- idFields:与待连接数据的匹配字段,可设置多个。
"StaticRDDJoinMapper": {
"className": "com.supermap.bdt.streaming.map.StaticRDDJoinMapper",
"caption": "StaticRDDJoinMapper",
"name": "StaticRDDJoinMapper",
"nextNodes": [],
"prevNodes": [],
"description": "StaticRDDJoinMapper",
"csvFile": "D:\\supermap\\soft\\esriunittype.csv",
"idFields": ["yard"]
}
Sender
继承于StreamNode,作为流数据处理的出口,向外发送数据。包含:
- EsAppendSender
- EsUpdateSender
- FileSender
- JMSSender
- SMSSender
- SocketClientSender
- SocketServerSender
- WebSocketClientSender
WebSocketClientSender
WebSocket 发送节点,用于将消息发送到 WebSocket
String path——WebSocket 服务地址。
注:通过 iServer 订阅功能接收数据,仅支持 GeoJsonFormatter。
示例:
"webSocketClientSender": {
"path": "ws://127.0.0.1/data",
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "webSocketClientSender",
"caption": "WebSocket Sender",
"description": "Send message by WebSocket",
"prevNodes": [],
"nextNodes": [],
"className":
"com.supermap.bdt.streaming.sender.WebSocketClientSender"
}
EsAppendSender
EsAppendSender 向 Elasticsearch 引擎新增数据的节点。可保存传入的所有数据,可在需要保存 streaming 的历史数据时使用。
String url——ES 服务地址加端口
String queueName——ES 节点名称
String directoryPath——ES 类型名称
示例:
"ESAppendSender": {
"url": "127.0.0.1:9200",
"queueName": "aircondition",
"directoryPath": "test1",
"formatter": {
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "ESAppendSender",
"caption": "ES 发送器",
"description": "",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.EsAppendSender"
}
EsUpdateSender
向 Elasticsearch 引擎新增与更新数据的节点。需要设置发送消息的 ID 字段,如果发送的消息内容中 ID 字段的值已经在 Elasticsearch 引擎中存在,则将更新该记录;如果 Elasticsearch 引擎中没有对应的 ID 值,则会新增一条记录。
String url——ES 服务地址
String port——ES 服务端口
String index——ES 节点名称
String typ——ES 类型名称
String idFieldName——唯一标识字段名称,用于查找需要更新的记录
示例:
"ESUpdateSender":
"url":"127.0.0.1",
" port":"9200",
" index":" aircondition ",
"typ":" test1",
"idFieldName":"id",
"name":"ESUpdateSender",
"caption":"ESUpdateSender",
"description":"Send message to Elasticsearch",
"prevNodes":[],
"nextNodes":[],
"className":"com.supermap.bdt.streaming.sender.ESUpdateSender"
}
FileSender
文件型发送节点,用于将消息保存到指定文件中。
String filePath —— 输出文件路径
示例:
"fileSender": {
"filePath": "C:\\result.csv",
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "fileSender",
"caption": "",
"description": "",
"prevNodes": [
"filter"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.FileSender"
}
JMSSender
发送 JMS 标准协议消息的节点,用于将消息发送到 ActiveMQ、RabbitMQ 等消息中间件。
String url——JMS 消息服务地址
Int port——消息服务端口
String queueName ——消息队列名称
String jdniName——对应消息中间件的 JDNI 名称,需要到中间件官网查询
String username——用户名
String password——密码
示例:
"JMSSender": {
"url": "192.168.168.33",
"port": 61616,
"queueName": "myTestJDNI",
"jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
"userName": "admin",
"password": "admin",
"formatter": {
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "JMSSender",
"caption": "",
"description": "",
"prevNodes": [
"TextFileReceiver"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.JMSSender"
}
SMSSender
短信消息发送节点,SMSSender 使用中国网建(http://www.webchinese.com.cn/)提供的 API 接口发送短信消息,注册中国网建用户后获得用户名和接口安全秘钥,即可发送短信消息。
注意:发送的内容审核时间大概 30 分钟以内,所以接收消息会有延迟。
String user——webchinese 用户名
String apiKey——webchinese 接口安全秘钥
java.util.List[String] phoneNumbers——发送的手机号码列表
int sendLimit——发送数量限制,避免消息频繁消耗短信次数,可以设置本次运行最多发送的条数。
示例:
"smsSender": {
"user": "user",
"apiKey": "apiKey",
"phoneNumbers": [
"13800000000"
],
"sendLimit": 100,
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "smsSender",
"caption": "SMS Sender",
"description": "Send message by SMS",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.SMSSender"
}
SocketClientSender
Socket 客户端发送节点,通过 Socket 客户端连接将消息发送到 Socket 服务端。
String ip——接收的 Socket 服务的 IP 地址;
Int port——接收的 Socket 服务的端口号。
示例:
"socketClientSender": {
"ip": "127.0.0.1",
"port": 9527,
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "socketClientSender",
"caption": "SocketClient Sender",
"description": "Send message by Socket",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.SocketClientSender"
}
SocketServerSender
Socket 服务端发送节点,启动 Socket 服务端,将消息发送到连接的 Socket 客户端。
Int port——启动 Socket 服务的端口号。
示例:
"socketServerSender": {
"port": 9527,
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "socketServerSender",
"caption": "SocketServer Sender",
"description": "Send message by Socket",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.SocketServerSender"
}
参照上述示例设置,可以将数据的分析处理结果输出到 iServer DataStore 创建的时空数据库中。
编写流处理模型文件
根据上述参数说明,编写一个完整的流处理模型文件,您可以将文件另存为后缀为.streaming的文件,用于快速发布流数据服务,也可以将流处理模型文件的内容直接写入“配置信息”中进行发布。示例如下:
{
"version": 9000,
"sparkParameter": {
"checkPointDir": "tmp",
"interval": 10000
},
"stream": {
"nodeDic": {
"AQIReceiver": {
"url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111",
"reader": {
"isJsonArray": true,
"arrayExpression": "airQualityList",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
},
"metadata": {
"title": "",
"epsg": 3857,
"fieldInfos": [
{
"name": "X",
"source": "location.x",
"nType": "DOUBLE"
},
{
"name": "Y",
"source": "location.y",
"nType": "DOUBLE"
},
{
"name": "positionName",
"source": "positionName",
"nType": "TEXT"
},
{
"name": "aqi",
"source": "aqi",
"nType": "DOUBLE"
}
],
"featureType": "POINT"
},
"name": "AQIReceiver",
"caption": "",
"description": "",
"prevNodes": [],
"nextNodes": [
"WebSocketClientSender"
],
"className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
},
"WebSocketClientSender": {
"path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast",
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "WebSocketClientSender",
"caption": "",
"description": "",
"prevNodes": [
"AQIReceiver"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.WebSocketClientSender"
}
}
}
}