应用
5. 资料来源
5.1. CDC来源
变更数据采集(CDC)源它捕获并流传输来自各种数据库的变更事件。
目前,它支持MySQL,PostgreSQL,MongoDB,神谕和SQL Server数据库。
基于Debezium嵌入式连接器,CDC来源允许通过不同的消息绑定器(如 Apache Kafka、RabbitMQ 及所有 Spring Cloud Stream 支持代理)捕获和流式数据库更改。
它支持所有 Debezium 配置属性。只需添加cdc.config。为现有的Debezium性质前缀。例如,将Debezium设为connector.class财产 使用cdc.config.connector.class而是用源属性。
我们为最常用的Debezium属性提供便捷快捷方式。例如,代替长cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnectorDebezium的财产你可以使用我们的CDC.connector=mySQL捷径。下表列出了所有可用的快捷方式及其所代表的Debezium属性。
Debezium 性质(例如cdc.config.XXX)总是优先于捷径!
CDC来源引入了新的默认设置BackingOffsetStore配置,基于MetadataStore服务。Later 提供了多种对微服务友好的偏移元数据存储方式。
5.1.1. 期权
按前缀分组的属性:
CDC
- 配置
-
Debezium配置属性的Spring通槽包裹器。所有带有“cdc.config.”前缀的属性都是Debezium的原生属性。前缀被移除,转为 Debezium io.debezium.config.Configuration。(Map<String, String>,默认:
<没有>) - 连接器
-
cdc.config.connector.class 属性的快捷方式。只要不互相矛盾,这两种方式都可以使用。(ConnectorType,默认:
<没有>,可能的值:MySQL,后文,蒙戈德,神谕,SQLserver) - 名称
-
该 sourceConnector 实例的唯一名称。(字符串,默认:
<没有>) - 图式
-
将这些模式作为出站消息的一部分包含。(布尔值,默认:
false)
CDC.平坦化
- 加法场
-
逗号分隔元数据字段列表,添加到扁平消息中。字段前缀会加上“__”或“__[<]struct]__”,具体取决于结构体的具体要求。(字符串,默认:
<没有>) - 加头
-
逗号分隔列表指定一组元数据字段,添加到扁平消息的头部。字段前缀为“__”或“__[struct]__”。(字符串,默认:
<没有>) - 删除处理模式
-
处理已删除记录的选项包括:(1) 无记录 - 传递记录,(2) 丢弃 - 删除记录 和 (3) 重写 - 为记录添加“__deleted”字段。(DeleteHandlingMode,默认:
<没有>,可能的值:落,重写,没有) - 落墓碑
-
默认情况下,Debezium 会生成墓碑记录,以实现已删除记录的卡夫卡压缩。墓碑的投放可以压制墓碑记录。(布尔值,默认:
true) - 启用
-
启用平坦化源记录事件(https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认:
true)
cdc.offset
- 提交超时
-
等待记录刷新并提交偏移数据到偏移存储前,最多有毫秒数,然后取消进程并恢复未来提交的偏移数据。(持续时间,默认:
5000毫秒) - 同花间隔
-
尝试提交抵消的间隔。默认是1分钟。(持续时间,默认:
60000毫秒) - 政策
-
偏移存储提交策略。(OffsetPolicy,默认:
<没有>) - 存储
-
Kafka 连接器跟踪处理记录的数量,并定期将计数(作为“偏移量”)存储在预配置的元数据存储中。重启后,连接器会从最后记录的源偏移量开始读取。(OffsetStorageType,默认:
<没有>,可能的值:存储,文件,卡 夫 卡,元数据)
cdc.stream.header
- 转换-连接-头部
-
当为真时,{@link org.apache.kafka.connect.header.Header} 会转换为消息头,名称为 {@link org.apache.kafka.connect.header.Header#key()},{@link org.apache.kafka.connect.header.Header#value()}。(布尔值,默认:
true) - 抵消
-
将源记录的偏移元数据序列化到cdc.offset下的外发消息头部。(布尔值,默认:
false)
Metadata.store.dynamo-db
- 创建延迟
-
创建表格重试之间的延迟。(整数,默认:
1) - 创建重试
-
创建表格请求的重试编号。(整数,默认:
25) - 读取容量
-
桌面上的读取容量。(长,默认:
1) - 桌子
-
元数据的表名。(字符串,默认:
<没有>) - 生存时间
-
表格条目的时间线。(整数,默认:
<没有>) - 写容量
-
写入容量放在桌面上。(长,默认:
1)
metadata.store
- 类型
-
表示要配置的元数据存储类型(默认为“内存”)。使用持久存储必须包含相应的 Spring Integration 依赖。(StoreType,默认:
<没有>,可能的值:蒙戈德,宝石火,Redis,迪纳莫德,JDBC,Zookeeper,Hazelcast,存储)
元数据.store.zookeeper
- 连接串
-
Zookeeper 连接字符串格式为 HOST:PORT。(字符串,默认:
127.0.0.1:2181) - 编码
-
用于在Zookeeper中存储数据时使用的编码。(字符集,默认:
UTF-8) - 重试区间
-
Zookeeper作的重试间隔以毫秒计。(整数,默认:
1000) - 根
-
根节点——存储条目是该节点的子节点。(字符串,默认:
/SpringIntegration-元数据存储)
Debezium 属性快捷方式映射
下表列出了所有可用的快捷方式及其所代表的Debezium属性。
| 捷径 | 源语言 | 描述 |
|---|---|---|
CDC.连接器 |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattening.drop-tombstones |
cdc.config.drop.tombstones |
|
cdc.flattening.delete-handling-mode |
cdc.config.delete.handling.mode |
|
5.1.3. 示例与测试
[CdcSourceIntegrationTest]()、[CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 集成测试使用测试数据库夹具,运行在本地机器上。
我们使用预构建的debezium docker数据库镜像。
Maven 构建通过Docker-Maven-插件.
要从IDE运行和调试测试,你需要从命令行部署所需的数据库镜像。 下面的说明说明了如何运行预配置的测试数据库,格式为Docker镜像。
MySQL
开始Debezium/Example-MySQL在 Docker 中:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
|
(可选)用
|
使用以下属性将CDC源代码与MySQL数据库连接:
cdc.connector=mysql (1)
cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)
cdc.config.database.user=debezium (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)
cdc.schema=true (4)
cdc.flattening.enabled=true (5)
| 1 | 配置CDC源代码以使用MySqlConnector。(相当于设定cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector). |
| 2 | 元数据用于识别和分发进入事件。 |
| 3 | 连接到运行在 的 MySQL 服务器本地主持人:3306如德贝齐姆用户。 |
| 4 | 包含变更事件值模式来源记录事件。 |
| 5 | 实现CDC事件平坦化。 |
你也可以运行CdcSourceIntegrationTests#CdcMysqlTests使用这个 MySQL 配置。
PostgreSQL
从Debezium/example-postgres:1.0Docker 镜像:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
你可以这样连接到这个服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将CDC源代码与PostgreSQL连接:
cdc.connector=postgres (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=postgres (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
| 1 | 配置CDC来源使用 PostgresConnector。等价于设定cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector. |
| 2 | 配置Debezium发动机以使用存储(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)backing offset store。 |
| 3 | 元数据用于识别和分发进入事件。 |
| 4 | 连接到运行于 的 PostgreSQL 服务器本地主机:5432如后文用户。 |
| 5 | 包含变更事件值模式来源记录事件。 |
| 6 | 实现CDC事件平坦化。 |
你也可以运行CdcSourceIntegrationTests#CdcPostgresTests使用这个 MySQL 配置。
MongoDB
从Debezium/Example-MongodB:0.10Docker 镜像:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
初始化库存集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在蒙戈德终端输出,搜索类似的日志条目主持人:“3F95A8A6516E:27017”:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
加127.0.0.1 3f95a8a6516e进入你的/等等/主持人
使用以下属性将CDC源代码与MongoDB连接:
cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)
cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)
cdc.config.tasks.max=1 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
| 1 | 配置CDC来源使用MongoDB连接器。这映射为cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector. |
| 2 | 配置Debezium发动机以使用存储(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)backing offset store。 |
| 3 | 与运行于 的 MongoDB 的连接本地主持:27017如德贝齐姆用户。 |
| 4 | debezium.io/docs/connectors/mongodb/#tasks |
| 5 | 包含变更事件值模式来源记录事件。 |
| 6 | 实现CDC事件平坦化。 |
你也可以运行CdcSourceIntegrationTests#CdcPostgresTests使用这个 MySQL 配置。
SQL Server
开始一个SQLserver来自Debezium/example-postgres:1.0Docker 镜像:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
请填写debezium的sqlserver教程中的示例数据:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将CDC源连接到SQLServer:
cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=sa (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
| 1 | 配置CDC来源使用 SqlServerConnector。等价于设定cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector. |
| 2 | 配置Debezium发动机以使用存储(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)backing offset store。 |
| 3 | 元数据用于识别和分发进入事件。 |
| 4 | 连接到运行于 的 SQL Server本地主持:1433如南非用户。 |
你也可以运行CdcSourceIntegrationTests#CdcSqlServerTests使用这个 MySQL 配置。
神谕
从localhost启动Oracle可访问,并按照Debezium Vagrant设置中描述的配置、用户和授权设置
请填写Debezium的Oracle教程中的示例数据:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件来源
该应用程序轮询目录,并将新文件或其内容发送到输出通道。 文件源默认以字节数组的形式提供文件内容。 不过,这可以通过 --file.supplier.mode 选项进行自定义:
-
ref 提供 java.io.file 引用
-
行 会逐行拆分文件,并为每行发送一条新消息
-
目录 默认。以字节数组的形式提供文件内容
使用--file.supplier.mode=lines你也可以提供额外的选项--file.supplier.withMarkers=true.
如果设置为 true,底层 FileSplitter 会在实际数据之前和之后发出额外的文件起始和结束标记消息。
这两个额外标记消息的有效载荷为FileSplitter.FileMarker.withMarkers 选项如果未明确设置,默认为 false。
5.2.1. 选项
文件源有以下选项:
按前缀分组的属性:
5.3. FTP源
该源应用程序支持使用FTP协议传输文件。
文件是从远程目录当地应用部署的目录。
源端发出的消息默认以字节数组形式提供。然而,这可以是
通过--模式选择:
-
裁判提供
java.io.file参考 -
线会逐行拆分文件,并为每行发送一条新消息
-
内容默认。以字节数组的形式提供文件内容
使用--模式=线你也可以提供额外的选项--withMarkers=真.
如果设置为true, 基础文件分拆器会在实际数据之前和之后发送额外的文件开头和结束标记信息。
这两个额外标记消息的有效载荷为FileSplitter.FileMarker.选项withMarkers默认为false如果没有明确设置。
另见元数据存储选项,了解可能的共享持久存储配置,以防止重启时出现重复消息。
5.3.2. 输出
5.3.3. 选项
FTP源有以下选项:
按前缀分组的属性:
file.consumer
- markers-json
-
当“fileMarkers == true”时,指定它们应生成为FileSplitter.FileMarker对象还是JSON。(布尔值,默认:
true) - 模式
-
FileReadingMode 用于文件读取源。值包括“ref”——文件对象,“lines”——每行一条消息,或“contents”——内容以字节形式表示。(文件阅读模式,默认:
<没有>,可能的值:裁判,线,内容) - 带标记
-
设置为 true,以便在数据之前或之后发送文件开始/结束文件标记信息。仅适用于FileReadingMode的“行”。(布尔值,默认:
<没有>)
FTP.factory
- 缓存会话
-
缓存会话。(布尔值,默认:
<没有>) - 客户端模式
-
用于FTP会话的客户端模式。(客户端模式,默认:
<没有>,可能的值:积极,被动) - 主机
-
服务器的主机名。(字符串,默认:
本地主持) - 密码
-
连接服务器的密码。(字符串,默认:
<没有>) - 端口
-
服务器的移植。(整数,默认:
21) - 用户名
-
连接服务器的用户名。(字符串,默认:
<没有>)
FTP.Supplier
- 自动-创建-本地-dir
-
设置为true以创建本地目录(如果不存在)。(布尔值,默认:
true) - 延迟-空时
-
当检测到无新文件时的延迟持续时间。(持续时间,默认:
1) - 删除远程文件
-
设置为true以在成功传输后删除远程文件。(布尔值,默认:
false) - 文件名模式
-
一个过滤模式,用来匹配要传输的文件名称。(字符串,默认:
<没有>) - 文件名正则表达式
-
一个过滤正则表达式模式,用于匹配要传输的文件名称。(模式,默认:
<没有>) - 本地导引
-
用于文件传输的本地目录。(文件,默认:
<没有>) - 保留时间戳
-
设置为true以保留原始时间戳。(布尔值,默认:
true) - 远程指令
-
远程FTP目录。(字符串,默认:
/) - 远程文件分隔器
-
远程文件分离器。(字符串,默认:
/) - tmp-file-后缀
-
转移过程中使用的后缀。(字符串,默认:
.tmp)
Metadata.store.dynamo-db
- 创建延迟
-
创建表格重试之间的延迟。(整数,默认:
1) - 创建重试
-
创建表格请求的重试编号。(整数,默认:
25) - 读取容量
-
桌面上的读取容量。(长,默认:
1) - 桌子
-
元数据的表名。(字符串,默认:
<没有>) - 生存时间
-
表格条目的时间线。(整数,默认:
<没有>) - 写容量
-
写入容量放在桌面上。(长,默认:
1)
5.4. 晶洞源
Geode 源会从 Apache Geode EntryEvents 或 CqEvent 中提取的对象流。
5.4.1. 选项
晶洞源具有以下选项:
按前缀分组的属性:
晶洞池
- 连接型
-
指定连接类型:“服务器”或“定位器”。(ConnectType,默认:
<没有>,可能的值:定位,服务器) - 主机地址
-
指定一个或多个 Gemfire 定位器或服务器地址,格式为 [host]:[port]。(InetSocketAddress[],默认:
<没有>) - 支持订阅
-
设置为 true,以启用客户端池的订阅。需要同步更新到客户端缓存。(布尔值,默认:
false)
geode.security.ssl
- 密码
-
将用于安全套接字连接的SSL密码配置为有效密码名称的数组。(字符串,默认:
任何) - keystore类型
-
识别用于SSL通信的密钥存储类型(如JKS、PKCS11等)。(字符串,默认:
开玩笑) - Keystore-URI
-
用于连接晶洞集群的预创建密钥存储URI的位置。(资源,默认:
<没有>) - SSL-keystore-password
-
访问密钥的密码是Truststore。(字符串,默认:
<没有>) - SSL-Truststore-password
-
访问信托存储的密码。(字符串,默认:
<没有>) - TrustStore类型
-
识别用于SSL通信的信任存储类型(例如JKS、PKCS11等)。(字符串,默认:
开玩笑) - Truststore-URI
-
用于连接Geode集群的预创建truststore库URI的位置。(资源,默认:
<没有>) - 用户主目录
-
本地目录用于缓存从 truststoreUri 和 keystoreUri 位置下载的 truststore 和 keystore 文件。(字符串,默认:
用户.home)
5.5. http 源代码
一个源应用程序,监听HTTP请求并将正文作为消息载荷发出。
如果内容类型匹配文本/*或application/json,有效载荷将是字符串,
否则,有效载荷将是一个字节数组。
5.6. JDBC资料来源
该源从RDBMS轮询数据。
本资料完全基于DataSourceAutoConfiguration,更多信息请参见 Spring Boot JDBC 支持。
5.6.2. 选项
jdbc 源代码有以下选项:
按前缀分组的属性:
JDBC.提供商
- 最大行数
-
查询时处理的最大行数。(整数,默认:
0) - 查询
-
用于选择数据的查询。(字符串,默认:
<没有>) - 分裂
-
是否将SQL结果拆分为单独的消息。(布尔值,默认:
true) - 更新
-
一个用于标记轮询消息为“已见”的SQL更新语句。(字符串,默认:
<没有>)
spring.cloud.stream.poller
- 克朗
-
Cron 触发器的 Cron 表达式值。(字符串,默认:
<没有>) - 固定延迟
-
默认轮询器的固定延迟。(长,默认:
1000) - 初始延迟
-
周期性触发的初始延迟。(整数,默认:
0) - 每轮询最大消息数
-
默认轮询器的每轮询最大消息数。(长,默认:
1) - 时间单位
-
时间单元用于延迟值。(TimeUnit,默认:
<没有>,可能的值:纳 秒,微秒,毫秒,秒,纪要,小时,日)
spring.datasource
- 数据
-
数据(DML)脚本资源引用。(List<String>,默认:
<没有>) - 驾驶员类别名称
-
JDBCDrivers的完全合格姓名。默认根据网址自动检测。(字符串,默认:
<没有>) - 初始化模式
-
在确定是否应使用可用的DDL和DML脚本执行DataSource初始化时,该模式适用。(DataSourceInitializationMode,默认:
嵌入式,可能的值:总是,嵌入式,从不) - 密码
-
数据库的登录密码。(字符串,默认:
<没有>) - 图式
-
模式(DDL)脚本资源引用。(List<String>,默认:
<没有>) - 网址
-
数据库的JDBC网址。(字符串,默认:
<没有>) - 用户名
-
数据库用户名登录。(字符串,默认:
<没有>)
另请参见 Spring Boot 文档以获取补充内容数据来源性质和触发属性和MaxMessagesProperties(最大信息属性)用于投票选项。
5.7. JMS 源
JMS 源能够接收来自 JMS 的消息。
5.7.1. 期权
JMS 源代码有以下选项:
按前缀分组的属性:
5.9. 邮件源
一个源应用程序,监听邮件并将消息正文作为消息负载发出。
5.9.1. 期权
邮件源有以下选项:
- mail.supplier.charset
-
字节[]邮件转字符串转换的字符集。(字符串,默认:
UTF-8) - mail.supplier.delete
-
设置为true以删除下载后的邮件。(布尔值,默认:
false) - Mail.supplier.expression
-
配置一个 SpEL 表达式来选择消息。(字符串,默认:
true) - mail.supplier.idle-imap
-
设置为true以使用IdleImap配置。(布尔值,默认:
false) - mail.supplier.java-mail-properties(-mail-properties)
-
JavaMail 属性作为新行分隔的名称-值对字符串,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - Mail.supplier.标记为已读
-
设置为true以标记邮件为已读。(布尔值,默认:
false) - Mail.supplier.url
-
邮件连接链接,用于连接邮件服务器,例如“imaps://username:[email protected]:993/Inbox'。(URLname,默认:
<没有>) - Mail.supplier.user-flag
-
当服务器不支持\Recent。(字符串,默认:
<没有>)
5.10. MongoDB 源
该来源来自MongoDB的数据轮询。
本资料完全基于MongoDataAutoConfiguration,更多信息请参阅 Spring Boot MongoDB 支持。
5.10.1. 选项
mongodb 源代码有以下选项:
按前缀分组的属性:
Mongodb.supplier
- 收集
-
查询MongoDB集合。(字符串,默认:
<没有>) - 查询
-
MongoDB查询。(字符串,默认:
{ }) - 查询表达式
-
MongoDB 查询中的 SpEL 表达式为 DSL 格式。(表达式,默认:
<没有>) - 分裂
-
是否将查询结果拆分为单独的消息。(布尔值,默认:
true) - 更新表达式
-
MongoDB 中的 SpEL 表达式是更新 DSL 风格的。(表达式,默认:
<没有>)
spring.data.mongodb
- 认证数据库
-
认证数据库名称。(字符串,默认:
<没有>) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认:
<没有>) - 数据库
-
数据库名称。(字符串,默认:
<没有>) - 场命名策略
-
字段命名策略的完全限定名称。(职业<?>,默认:
<没有>) - 网格-FS-数据库
-
<缺少文档>(字符串,默认:
<没有>) - 主机
-
Mongo服务器主机。无法用URI设置。(字符串,默认:
<没有>) - 密码
-
Mongo服务器的登录密码。无法用URI设置。(字符[],默认:
<没有>) - 端口
-
Mongo服务器端口。无法用URI设置。(整数,默认:
<没有>) - 复制集名称
-
集群需要副本集名称。无法用URI设置。(字符串,默认:
<没有>) - 乌里
-
Mongo 数据库 URI。不能设置主机、端口、凭据和副本集名称。(字符串,默认:
mongodb://localhost/test) - 用户名
-
登录Mongo服务器的用户。无法用URI设置。(字符串,默认:
<没有>) - UUID 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示方式。(UuidRepresentation,默认:
Java-遗产,可能的值:未具体说明,标准,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
更多内容请参见 Spring Boot 文档MongoProperties性能。
参见 和触发属性用于投票选项。
5.11. MQTT 来源
能够接收来自MQTT消息的源。
5.12. RabbitMQ 来源
“rabbit”源能够接收来自RabbitMQ的消息。
队列必须在流部署前存在;它们不是自动生成的。 你可以用RabbitMQ的网页界面轻松创建队列。
5.12.3. 选项
兔子资源有以下选项:
按前缀分组的属性:
rabbit.supplier
- 使得重试
-
确实可以启用重试。(布尔值,默认:
false) - 初始重试区间
-
启用重试时的初始重试间隔。(整数,默认:
1000) - 映射请求头
-
这些头部将被映射。(字符串[],默认:
[STANDARD_REQUEST_HEADERS]) - 最大尝试次数
-
启用重试时,最大投递次数。(整数,默认:
3) - 最大重试区间
-
当重试被启用时,重试间隔最大。(整数,默认:
30000) - 自有连接
-
如果是真的,根据启动属性使用单独的连接。(布尔值,默认:
false) - 队列
-
源端会监听消息的队列。(字符串[],默认:
<没有>) - 重新排队
-
拒绝消息是否应重新排队。(布尔值,默认:
true) - 重试乘法
-
当重试开启时,重试回退倍率。(双重,默认:
2) - 交易
-
通道是否被交易。(布尔值,默认:
false)
Spring.rabbitmq
- 地址洗牌模式
-
用于洗牌配置地址的模式。(AddressShuffleMode,默认:
没有,可能的值:没有,随机,顺序) - 地址
-
客户端应连接的地址列表,以逗号分隔。设置好后,主机和端口被忽略。(字符串,默认:
<没有>) - 通道-RPC超时
-
通道中RPC呼叫的续续超时。把它设为零,等待很久。(持续时间,默认:
10米) - 连接超时
-
连接超时。把它设为零,等待很久。(持续时间,默认:
<没有>) - 主机
-
RabbitMQ主机。如果地址被设置了,则忽略。(字符串,默认:
本地主持) - 密码
-
登录以验证经纪人身份。(字符串,默认:
客人) - 端口
-
RabbitMQ移植版。如果地址被设置了,则忽略。默认使用5672,启用SSL则使用5671。(整数,默认:
<没有>) - 出版者-确认-类型
-
出版商类型确认使用。(ConfirmType,默认:
<没有>,可能的值:简单,相关,没有) - 出版商回归
-
是否启用出版商退货。(布尔值,默认:
false) - 请求通道最大值
-
客户端请求的每个连接的频道数量。无限用0。(整数,默认:
2047) - 请求心跳
-
请求心跳暂停;零对零。如果未指定持续时间后缀,则使用秒。(持续时间,默认:
<没有>) - 用户名
-
登录用户以向经纪人进行身份验证。(字符串,默认:
客人) - 虚拟主机
-
连接经纪商时使用的虚拟主机。(字符串,默认:
<没有>)
另请参阅 Spring Boot 文档,了解代理连接和监听者属性的附加属性。
关于重试的说明
使用默认的ackMode(AUTO)和requeue(true)选项,失败的消息传递将被重新尝试
无限期。
由于兔源处理较少,源本身故障的风险较小,除非
下游粘结 剂不知为何没有连接。
将 requeue 设置为 false 会导致消息在第一次尝试时被拒绝(甚至可能发送到死信
如果经纪人配置了交换/队列)。
enableRetry 选项允许配置重试参数,使得失败的消息传递可以重新尝试,
当重试次数用尽时,最终会被丢弃(或写死字母)。
在重试期间,传递线程会被暂停。
重试选项有 enableRetry、maxAttempts、initialRetryInterval、retryMultiplier 和 maxRetryInterval。
带有MessageConversionException的消息传递失败时,绝不会重试;假设如果
第一次尝试无法转换,后续尝试也会失败。
此类消息会被丢弃(或死字母处理)。 |
5.13. 亚马逊S3来源
该源应用支持使用亚马逊S3协议传输文件。
文件是从远程目录(S3桶)映射到当地应用程序部署的目录。
源端发出的消息默认以字节数组形式提供。然而,这可以是
通过--模式选择:
-
裁判提供
java.io.file参考 -
线会逐行拆分文件,并为每行发送一条新消息
-
内容默认。以字节数组的形式提供文件内容
使用--模式=线你也可以提供额外的选项--withMarkers=真.
如果设置为true, 基础文件分拆器会在实际数据之前和之后发送额外的文件开头和结束标记信息。
这两个额外标记消息的有效载荷为FileSplitter.FileMarker.选项withMarkers默认为false如果没有明确设置。
另见元数据存储选项,了解可能的共享持久存储配置,以防止重启时出现重复消息。
模式 = 线
5.13.3. 选项
S3 源有以下选项:
按前缀分组的属性:
file.consumer
- markers-json
-
当“fileMarkers == true”时,指定它们应生成为FileSplitter.FileMarker对象还是JSON。(布尔值,默认:
true) - 模式
-
FileReadingMode 用于文件读取源。值包括“ref”——文件对象,“lines”——每行一条消息,或“contents”——内容以字节形式表示。(文件阅读模式,默认:
<没有>,可能的值:裁判,线,内容) - 带标记
-
设置为 true,以便在数据之前或之后发送文件开始/结束文件标记信息。仅适用于FileReadingMode的“行”。(布尔值,默认:
<没有>)
Metadata.store.dynamo-db
- 创建延迟
-
创建表格重试之间的延迟。(整数,默认:
1) - 创建重试
-
创建表格请求的重试编号。(整数,默认:
25) - 读取容量
-
桌面上的读取容量。(长,默认:
1) - 桌子
-
元数据的表名。(字符串,默认:
<没有>) - 生存时间
-
表格条目的时间线。(整数,默认:
<没有>) - 写容量
-
写入容量放在桌面上。(长,默认:
1)
metadata.store
- 类型
-
表示要配置的元数据存储类型(默认为“内存”)。使用持久存储必须包含相应的 Spring Integration 依赖。(StoreType,默认:
<没有>,可能的值:蒙戈德,宝石火,Redis,迪纳莫德,JDBC,Zookeeper,Hazelcast,存储)
元数据.store.zookeeper
- 连接串
-
Zookeeper 连接字符串格式为 HOST:PORT。(字符串,默认:
127.0.0.1:2181) - 编码
-
用于在Zookeeper中存储数据时使用的编码。(字符集,默认:
UTF-8) - 重试区间
-
Zookeeper作的重试间隔以毫秒计。(整数,默认:
1000) - 根
-
根节点——存储条目是该节点的子节点。(字符串,默认:
/SpringIntegration-元数据存储)
S3.提供商
- 自动-创建-本地-dir
-
创建或不创建本地目录。(布尔值,默认:
true) - 删除远程文件
-
处理完后删除或不删除远程文件。(布尔值,默认:
false) - 文件名模式
-
过滤远程文件的模式。(字符串,默认:
<没有>) - 文件名正则表达式
-
正规表达式用于过滤远程文件。(模式,默认:
<没有>) - 仅列表
-
设置为 true,以返回 s3 对象元数据,但不复制文件到本地目录。(布尔值,默认:
false) - 本地导引
-
本地目录用于存储文件。(文件,默认:
<没有>) - 保留时间戳
-
是否将远程文件的时间戳转移到本地文件。(布尔值,默认:
true) - 远程指令
-
AWS S3 桶资源。(字符串,默认:
桶) - 远程文件分隔器
-
远程文件分离器。(字符串,默认:
/) - tmp-file-后缀
-
临时文件后缀。(字符串,默认:
.tmp)
5.13.4. 亚马逊AWS常见选项
Amazon S3 Source(和其他所有 Amazon AWS 应用一样)基于 Spring Cloud AWS 项目作为基础,并实现了自动配置 Spring Boot 会自动使用职业。 请参阅他们关于必需且有用的自动配置属性的文档。
其中一些是关于AWS凭证的:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他的则是AWS的地区定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
还有 AWS叠:
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
5.14. SFTP来源
该源应用程序支持使用SFTP协议传输文件。
文件是从远程目录当地应用部署的目录。
源端发出的消息默认以字节数组形式提供。然而,这可以是
通过--模式选择:
-
裁判提供
java.io.file参考 -
线会逐行拆分文件,并为每行发送一条新消息
-
内容默认。以字节数组的形式提供文件内容
使用--模式=线你也可以提供额外的选项--withMarkers=真.
如果设置为true, 基础文件分拆器会在实际数据之前和之后发送额外的文件开头和结束标记信息。
这两个额外标记消息的有效载荷为FileSplitter.FileMarker.选项withMarkers默认为false如果没有明确设置。
看SFTP-提供商用于高级配置选项。
另见元数据存储选项,了解可能的共享持久存储配置,以防止重启时出现重复消息。
5.14.2. 输出
模式 = 目录
5.14.3. 选项
FTP源有以下选项:
按前缀分组的属性:
file.consumer
- markers-json
-
当“fileMarkers == true”时,指定它们应生成为FileSplitter.FileMarker对象还是JSON。(布尔值,默认:
true) - 模式
-
FileReadingMode 用于文件读取源。值包括“ref”——文件对象,“lines”——每行一条消息,或“contents”——内容以字节形式表示。(文件阅读模式,默认:
<没有>,可能的值:裁判,线,内容) - 带标记
-
设置为 true,以便在数据之前或之后发送文件开始/结束文件标记信息。仅适用于FileReadingMode的“行”。(布尔值,默认:
<没有>)
Metadata.store.dynamo-db
- 创建延迟
-
创建表格重试之间的延迟。(整数,默认:
1) - 创建重试
-
创建表格请求的重试编号。(整数,默认:
25) - 读取容量
-
桌面上的读取容量。(长,默认:
1) - 桌子
-
元数据的表名。(字符串,默认:
<没有>) - 生存时间
-
表格条目的时间线。(整数,默认:
<没有>) - 写容量
-
写入容量放在桌面上。(长,默认:
1)
metadata.store
- 类型
-
表示要配置的元数据存储类型(默认为“内存”)。使用持久存储必须包含相应的 Spring Integration 依赖。(StoreType,默认:
<没有>,可能的值:蒙戈德,宝石火,Redis,迪纳莫德,JDBC,Zookeeper,Hazelcast,存储)
元数据.store.zookeeper
- 连接串
-
Zookeeper 连接字符串格式为 HOST:PORT。(字符串,默认:
127.0.0.1:2181) - 编码
-
用于在Zookeeper中存储数据时使用的编码。(字符集,默认:
UTF-8) - 重试区间
-
Zookeeper作的重试间隔以毫秒计。(整数,默认:
1000) - 根
-
根节点——存储条目是该节点的子节点。(字符串,默认:
/SpringIntegration-元数据存储)
SFTP.提供商
- 自动-创建-本地-dir
-
设置为true以创建本地目录(如果不存在)。(布尔值,默认:
true) - 延迟-空时
-
当检测到无新文件时的延迟持续时间。(持续时间,默认:
1) - 删除远程文件
-
设置为true以在成功传输后删除远程文件。(布尔值,默认:
false) - 目录
-
一份工厂“name.directory”对的列表。(字符串[],默认:
<没有>) - 工厂
-
工厂名称映射到工厂。(Map<String, Factory>,默认:
<没有>) - 公平
-
对于多个服务器/目录的公平轮换来说,确实如此。默认情况下,这个错误,因此如果一个来源有多个条目,这些条目会在访问其他来源之前被接收。(布尔值,默认:
false) - 文件名模式
-
一个过滤模式,用来匹配要传输的文件名称。(字符串,默认:
<没有>) - 文件名正则表达式
-
一个过滤正则表达式模式,用于匹配要传输的文件名称。(模式,默认:
<没有>) - 仅列表
-
设置为 true,以返回文件元数据而不包含全部有效载荷。(布尔值,默认:
false) - 本地导引
-
用于文件传输的本地目录。(文件,默认:
<没有>) - 最大取物
-
每次轮询可获取的最大远程文件数;默认无限。在列出文件或构建任务启动请求时不适用。(整数,默认:
<没有>) - 保留时间戳
-
设置为true以保留原始时间戳。(布尔值,默认:
true) - 远程指令
-
远程FTP目录。(字符串,默认:
/) - 远程文件分隔器
-
远程文件分离器。(字符串,默认:
/) - rename-remote-files-to
-
解析为新名称的 SpEL 表达式在成功传输后必须重命名为(表达式,默认:
<没有>) - 流
-
设置为 true,以便流式传输文件,而不是复制到本地目录。(布尔值,默认:
false) - tmp-file-后缀
-
转移过程中使用的后缀。(字符串,默认:
.tmp)
5.15. 系统日志
syslog 源通过 UDP、TCP 或两者都接收 SYSLOG 数据包。支持RFC3164(BSD)和RFC5424格式。
5.15.1. 选项
- syslog.supplier.buffer-size
-
解码消息时使用的缓冲区大小;较大的消息将被拒绝。(整数,默认:
2048) - syslog.supplier.nio
-
是否使用NIO(支持大量连接)。(布尔值,默认:
false) - syslog.supplier.port
-
听的端口。(整数,默认:
1514) - syslog.supplier.protocol
-
用于SYSLOG的协议(TCP或UDP)。(协议,默认:
<没有>,可能的值:TCP,UDP,双) - syslog.supplier.reverse-lookup
-
是否对输入插槽进行反向查找。(布尔值,默认:
false) - syslog.supplier.rfc
-
“5424”或“3164”——根据RFC的syslog格式;3164 也被称为“BSD”格式。(字符串,默认:
3164) - syslog.supplier.socket-timeout
-
插槽的超时。(整数,默认:
0)
5.16. TCP
这TCP源代码作为服务器,允许远程方连接并通过原始TCP套接字提交数据。
TCP是一种流式传输协议,需要某种机制来在线路上对消息进行帧。有若干解码器 可用,默认为“CRLF”,兼容Telnet。
TCP源应用程序产生的消息具有字节[]有效载荷。
5.17. 时间源
时间源会定期发出一个包含当前时间的字符串。
5.17.1. 期权
时间源有以下选项:
- spring.cloud.stream.poller.cron
-
Cron 触发器的 Cron 表达式值。(字符串,默认:
<没有>) - spring.cloud.stream.poller.fixed-delay
-
默认轮询器的固定延迟。(长,默认:
1000) - spring.cloud.stream.poller.initial-delay
-
周期性触发的初始延迟。(整数,默认:
0) - 每条轮询数spring.cloud.stream.poller.max
-
默认轮询器的每轮询最大消息数。(长,默认:
1) - spring.cloud.stream.poller.time-unit
-
时间单元用于延迟值。(TimeUnit,默认:
<没有>,可能的值:纳 秒,微秒,毫秒,秒,纪要,小时,日) - 时间.日期格式
-
日期值格式。(字符串,默认:
嗯/dd/yy HH:mm:ss。)
5.18. 推特消息来源
反复检索过去30天内的直接消息(包括发送和接收),按倒时间顺序排序。
解除的消息被缓存(在元数据存储缓存)以防止重复。
默认情况下,内存中SimpleMetadataStore被使用。
这推特.消息.来源.计数控制返回消息的数量。
这spring.cloud.stream.poller属性控制消息轮询间隔。
必须与已使用API的速率限制保持一致
5.18.1. 期权
按前缀分组的属性:
spring.cloud.stream.poller
- 克朗
-
Cron 触发器的 Cron 表达式值。(字符串,默认:
<没有>) - 固定延迟
-
默认轮询器的固定延迟。(长,默认:
1000) - 初始延迟
-
周期性触发的初始延迟。(整数,默认:
0) - 每轮询最大消息数
-
默认轮询器的每轮询最大消息数。(长,默认:
1) - 时间单位
-
时间单元用于延迟值。(TimeUnit,默认:
<没有>,可能的值:纳 秒,微秒,毫秒,秒,纪要,小时,日)
Twitter.connection
- 访问Tokens
-
你的TwitterTokens。(字符串,默认:
<没有>) - 访问Tokens秘密
-
你的推特Tokens秘密。(字符串,默认:
<没有>) - 消费者密钥
-
你的推特密钥。(字符串,默认:
<没有>) - 消费者秘密
-
你的推特秘密。(字符串,默认:
<没有>) - 调试支持
-
启用 Twitter4J 调试模式。(布尔值,默认:
false) - raw-json
-
启用缓存由 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示。设置为 True 时,结果将使用原始的 Twitter APIS json 表示。(布尔值,默认:
true)
5.19. 推特搜索源
Twitter的标准搜索API(搜索/推文)允许对最近或热门推文的索引进行简单查询。这源对过去7天内发布的推文进行持续搜索。属于“公开”API集。
返回一组匹配特定查询的相关推文。
使用该spring.cloud.stream.poller属性用于控制连续搜索请求之间的间隔。速率限制 - 每30分钟窗口180个请求(例如~6转/分钟,~1个请求/10秒)
这推特.搜索查询属性允许按关键词查询,并按时间和地理位置筛选结果。
这推特.search.count和twitter.search.page根据搜索API控制结果分页。
注意:Twitter的搜索服务及其延伸的搜索API并非推文的详尽来源。并非所有推文都会被索引或通过搜索界面提供。
5.19.1. 期权
按前缀分组的属性:
spring.cloud.stream.poller
- 克朗
-
Cron 触发器的 Cron 表达式值。(字符串,默认:
<没有>) - 固定延迟
-
默认轮询器的固定延迟。(长,默认:
1000) - 初始延迟
-
周期性触发的初始延迟。(整数,默认:
0) - 每轮询最大消息数
-
默认轮询器的每轮询最大消息数。(长,默认:
1) - 时间单位
-
时间单元用于延迟值。(TimeUnit,默认:
<没有>,可能的值:纳 秒,微秒,毫秒,秒,纪要,小时,日)
Twitter.connection
- 访问Tokens
-
你的TwitterTokens。(字符串,默认:
<没有>) - 访问Tokens秘密
-
你的推特Tokens秘密。(字符串,默认:
<没有>) - 消费者密钥
-
你的推特密钥。(字符串,默认:
<没有>) - 消费者秘密
-
你的推特秘密。(字符串,默认:
<没有>) - 调试支持
-
启用 Twitter4J 调试模式。(布尔值,默认:
false) - raw-json
-
启用缓存由 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示。设置为 True 时,结果将使用原始的 Twitter APIS json 表示。(布尔值,默认:
true)
推特.搜索
- 计数
-
每页(例如每个单次请求)可返回的推文数量,最多可达100条。(整数,默认:
100) - 朗
-
限制搜索推文只能使用由 http://en.wikipedia.org/wiki/ISO_639-1 给定的语言。(字符串,默认:
<没有>) - 页
-
在重新从最近一条推文开始搜索之前,需要倒查页面数(例如请求数)。倒推推文总数为(页数乘以)(整数,默认:
3) - 查询
-
通过搜索查询词搜索推文。(字符串,默认:
<没有>) - 从最近最多回复开始
-
从最近的推文开始搜索,回复空。仅在第一次重启后应用(例如since_id != 无界)(布尔值,默认:
false) - 结果类型
-
具体说明你希望获得哪种类型的搜索结果。目前默认状态是“混合型”。有效值包括:混合:在回答中包含流行和实时结果。最近:仅返回回答中最新的结果 热门:返回回答中最受欢迎的结果(ResultType,默认:
<没有>,可能的值:流行,混合,最近) - 因为
-
如有指定,回传自指定日期以来的推文。日期应格式为 YYYY-MM-DD。(字符串,默认:
<没有>)
5.20. 推特流源
-
这
过滤 API返回与一个或多个过滤谓词匹配的公共状态。 多个参数支持使用单一连接流媒体API。 提示:该跟踪,跟随和地点场与运算符结合! 查询track=foo和跟随=1234回复 推文匹配测试或者由用户创建1234. -
这
示例API返回所有公共状态的一小部分随机样本。 默认访问级别返回的推文是相同的,所以如果两个不同的客户端连接到该端点,它们会看到相同的推文。
默认访问层允许最多400条关键词、5000个关注用户ID和25个0.1-360度位置框。
5.20.1. 期权
按前缀分组的属性:
Twitter.connection
- 访问Tokens
-
你的TwitterTokens。(字符串,默认:
<没有>) - 访问Tokens秘密
-
你的推特Tokens秘密。(字符串,默认:
<没有>) - 消费者密钥
-
你的推特密钥。(字符串,默认:
<没有>) - 消费者秘密
-
你的推特秘密。(字符串,默认:
<没有>) - 调试支持
-
启用 Twitter4J 调试模式。(布尔值,默认:
false) - raw-json
-
启用缓存由 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示。设置为 True 时,结果将使用原始的 Twitter APIS json 表示。(布尔值,默认:
true)
推特.stream.filter
- 计数
-
表示在切换到直播前需要直播的之前状态数量。(整数,默认:
0) - Filter级
-
过滤器级别限制了流中出现的推文,仅限于具有最小 filterLevel 属性值的推文。要么是无,要么是低,要么是中等。(FilterLevel,默认:
<没有>) - 跟随
-
通过ID指定用户接收公共推文。(列表<长>,默认:
<没有>) - 语言
-
指定流的推文语言。(List<String>,默认:
<没有>) - 地点
-
需要追踪的位置。内部表示为二维数组。边界框无效:52.38、4.90、51.51、-0.12。第一对必须是盒子的西南角(List<BoundingBox>,默认:
<没有>) - 跟踪
-
指定需要追踪的关键词。(List<String>,默认:
<没有>)
5.22. ZeroMQ 源
“zeromq”源能够接收来自ZeroMQ的消息。
5.22.3. 选项
zeromq 源具有以下选项:
- zeromq.supplier.bind-port
-
绑定端口用于创建 ZeroMQ 套接字;0 选择一个随机端口。(整数,默认:
0) - zeromq.supplier.connect-url
-
连接到 ZeroMQ 套接字的连接 URL。(字符串,默认:
<没有>) - zeromq.supplier.consume-delay
-
当没有收到数据时,ZeroMQ 套接字的消耗延迟。(持续时间,默认:
1) - zeromq.supplier.socket-type
-
连接应该形成的插槽类型。(SocketType,默认:
<没有>,可能的值:双,酒馆,子,要求,代表,贩子,路由器,拉,推,XPUB,XSUB,流) - zeromq.supplier.topics
-
订阅主题。(字符串[],默认:
[])
另请参阅 Spring Boot 文档,了解代理连接和监听者属性的附加属性。
6. 处理器
6.1. 聚合处理器
聚合处理器使应用程序能够将收到的消息聚集成组并释放到输出目的地。
Java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果你想在RabbitMQ上运行Kafka,可以把它改成Rabbit。
6.1.2. 选项
按前缀分组的属性:
聚合
- 集合体
-
聚合策略的SpEL表达式。默认是收集有效载荷。(表达式,默认:
<没有>) - 相关
-
相关性键的SpEL表达式。默认使用 correlationId 头部。(表达式,默认:
<没有>) - 小组暂停
-
SpEL表达式用于超时至到期的未完成组。(表达式,默认:
<没有>) - 消息存储实体
-
持久性消息存储实体:RDBMS 中的表前缀,MongoDb 中的集合名称等。(字符串,默认:
<没有>) - 消息存储类型
-
消息存储类型。(字符串,默认:
<没有>) - 释放
-
发布策略的SpEL表达式。默认设置基于序列大小头部。(表达式,默认:
<没有>)
spring.data.mongodb
- 认证数据库
-
认证数据库名称。(字符串,默认:
<没有>) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认:
<没有>) - 数据库
-
数据库名称。(字符串,默认:
<没有>) - 场命名策略
-
字段命名策略的完全限定名称。(职业<?>,默认:
<没有>) - 网格-FS-数据库
-
<缺少文档>(字符串,默认:
<没有>) - 主机
-
Mongo服务器主机。无法用URI设置。(字符串,默认:
<没有>) - 密码
-
Mongo服务器的登录密码。无法用URI设置。(字符[],默认:
<没有>) - 端口
-
Mongo服务器端口。无法用URI设置。(整数,默认:
<没有>) - 复制集名称
-
集群需要副本集名称。无法用URI设置。(字符串,默认:
<没有>) - 乌里
-
Mongo 数据库 URI。不能设置主机、端口、凭据和副本集名称。(字符串,默认:
mongodb://localhost/test) - 用户名
-
登录Mongo服务器的用户。无法用URI设置。(字符串,默认:
<没有>) - UUID 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示方式。(UuidRepresentation,默认:
Java-遗产,可能的值:未具体说明,标准,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
spring.datasource
- 错误继续
-
如果在初始化数据库时发生错误,是否应停止。(布尔值,默认:
false) - 数据
-
数据(DML)脚本资源引用。(List<String>,默认:
<没有>) - 数据密码
-
数据库的密码用于执行DML脚本(如果不同)。(字符串,默认:
<没有>) - 数据用户名
-
数据库的用户名用于执行DML脚本(如果不同)。(字符串,默认:
<没有>) - 驾驶员类别名称
-
JDBCDrivers的完全合格姓名。默认根据网址自动检测。(字符串,默认:
<没有>) - 嵌入数据库连接
-
嵌入式数据库的连接详情。默认使用类路径上最合适的嵌入式数据库。(嵌入数据库连接,默认:
<没有>,可能的值:没有,H2,德比,HSQL,HSQLDB) - 生成唯一名称
-
是否生成一个随机的数据源名称。(布尔值,默认:
true) - 初始化模式
-
在确定是否应使用可用的DDL和DML脚本执行DataSource初始化时,该模式适用。(DataSourceInitializationMode,默认:
嵌入式,可能的值:总是,嵌入式,从不) - JNDI名称
-
数据源的JNDI位置。设置时,类别、网址、用户名和密码都被忽略。(字符串,默认:
<没有>) - 名称
-
如果“generate-unique-name”为假,则使用数据源名称。使用嵌入式数据库时默认为“testdb”,否则为空。(字符串,默认:
<没有>) - 密码
-
数据库的登录密码。(字符串,默认:
<没有>) - 平台
-
用于DDL或DML脚本中的平台(如schema-${platform}.sql或data-${platform}.sql)。(字符串,默认:
都) - 图式
-
模式(DDL)脚本资源引用。(List<String>,默认:
<没有>) - schema-password
-
数据库的密码用于执行DDL脚本(如果不同)。(字符串,默认:
<没有>) - schema-username
-
数据库的用户名用于执行DDL脚本(如果不同)。(字符串,默认:
<没有>) - 分隔符
-
SQL 初始化脚本中的语句分隔符。(字符串,默认:
;) - SQL 脚本编码
-
SQL 脚本编码。(字符集,默认:
<没有>) - 类型
-
使用连接池实现的完全限定名称。默认情况下,它会被类路径自动检测到。(Class<DataSource>,默认:
<没有>) - 网址
-
数据库的JDBC网址。(字符串,默认:
<没有>) - 用户名
-
数据库用户名登录。(字符串,默认:
<没有>)
Spring.mongodb.embedded
- 特征
-
开启功能列表,逗号分隔。默认使用配置版本的默认值。(设置<功能>,默认:
[sync_delay]) - 版本
-
Mongo的版本。(字符串,默认:
3.5.5)
Spring.redis。
- 客户名
-
客户端名称应与客户端 SETNAME 连接时设置。(字符串,默认:
<没有>) - 客户端类型
-
使用客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认:
<没有>,可能的值:生菜,杰迪斯) - 连接超时
-
连接超时。(持续时间,默认:
<没有>) - 数据库
-
连接工厂使用的数据库索引。(整数,默认:
0) - 主机
-
Redis服务器主机。(字符串,默认:
本地主持) - 密码
-
Redis 服务器的登录密码。(字符串,默认:
<没有>) - 端口
-
Redis服务器端口。(整数,默认:
6379) - SSL(高级SSL)
-
是否启用SSL支持。(布尔值,默认:
false) - 超时
-
读“暂停”。(持续时间,默认:
<没有>) - 网址
-
连接网址。覆盖主机、端口和密码。用户被忽视了。示例:redis://user:[email protected]:6379(字符串,默认:
<没有>) - 用户名
-
登录Redis服务器的用户名。(字符串,默认:
<没有>)
6.3. 滤波处理器
过滤处理器使应用程序能够检查进入的有效载荷,然后对其应用一个谓词,决定记录是否需要继续。
例如,如果输入的有效载荷类型为字符串如果你想过滤掉少于五个字符的内容,可以按照下面的方式运行过滤处理器。
Java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果你想在RabbitMQ上运行Kafka,可以把它改成Rabbit。
6.5. 头部增益处理器
使用头部增强器应用添加消息头部。
头部以新行分隔键值对的形式提供,其中键为头部名称,值为SpEL表达式。
例如--headers='foo=payload.someProperty \n bar=payload.otherProperty'.
6.6. Http请求处理器
一个处理器应用,向HTTP资源发出请求,并将响应体作为消息载荷发出。
6.6.1. 输入
有效载荷
有效载荷默认作为POST请求的请求体,可以是任何Java类型。 对于 GET 请求,应该是空字符串。 有效载荷还可用于构建:
-
请求主体在
身体表情财产。 -
HTTP方法在
http-方法表达式财产。 -
在
URL 表达式财产。
底层Web客户端支持Jackson JSON序列化,必要时支持任何请求和响应类型。
这期望反应型属性String.class默认情况下,可以设置为你应用类路径中的任何类。
注意,用户自定义的负载类型需要向你的 pom 文件添加必要的依赖。
6.6.2. 输出
有效载荷
原始输出对象是 ResponseEntity<?>其任意字段(例如,身体,头)或访问器方法(状态代码)可以作为回复表达式.
默认情况下,出站消息有效载荷是响应体。
注意 ResponseEntity(由表达式#root)默认情况下不能被Jackson反序列化,但可以被表示为哈希图.
6.6.4. 期权
- http.request.body-expression
-
一个SpEL表达式,用于从收到的消息中推导出请求体。(表达式,默认:
<没有>) - http.request.expected-response-type
-
用来解读回应的类型。(职业<?>,默认:
<没有>) - http.request.headers-expression
-
用于推导 HTTP 头部映射的 SpEL 表达式。(表达式,默认:
<没有>) - http.request.http-method-expression
-
一个SpEL表达式,用于从收到的消息中推导出请求方法。(表达式,默认:
<没有>) - http.request.maximum-buffer-size
-
输入流缓冲区的最大缓冲区大小(以字节为单位)。默认是256k。根据需要,发布或获取大量二进制内容时可提高。(整数,默认:
0) - http.request.reply-expression
-
用于计算最终结果的 SpEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认:
<没有>) - http.request.timeout
-
请求超时,毫秒级。(长,默认:
30000) - http.request.url-expression
-
对入件消息的 SpEL 表达式来确定应使用的 URL。(表达式,默认:
<没有>)
6.7. 图像识别处理器
使用初始化模型进行分类的处理器 实时图像被分割成不同类别(例如标签)。
模型的输入是图像,作为二进制数组。
输出为以下格式的 JSON 消息:
{
"labels" : [
{"giant panda":0.98649305}
]
}
结果包含识别类别的名称(例如标签)以及图像代表该类别的置信度(例如置信度)。
如果响应-扣押当值设置为大于1时,结果将包括顶部响应-扣押可能的标签。例如响应大小=3将回归:
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
6.7.2. 期权
- image.recognition.cache-model
-
缓存预训练的张量流模型。(布尔值,默认:
true) - image.recognition.debug-output
-
<缺少文档>(布尔值,默认:
false) - image.recognition.debug-output-path
-
<缺少文档>(字符串,默认:
image-recognition-result.png) - image.recognition.model
-
预训练张量流图像识别模型。请注意,模型必须与所选模型类型相匹配!(字符串,默认:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb) - image.recognition.model-type
-
支持三种不同的预训练张量流图像识别模型:Inception、MobileNetV1 和 MobileNetV2 1。Inception Graph 使用“input”作为输入,“output”作为输出。2. MobileNetV2预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models——归一化后的图像尺寸始终为正方形(例如H=W)——图以“input”为输入,输出为“MobilenetV2/Predictions/Reshape_1”。3. MobileNetV1预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图以“input”为输入,输出为“MobilenetV1/Predictions/Reshape_1”。(ModelType,默认:
<没有>,可能的值:初始,移动网1,移动网2) - image.recognition.normalized-image-size
-
图像尺寸归一化。(整数,默认:
224) - 图像。识别。响应大小
-
识别图像数量。(整数,默认:
5)
6.8. 对象检测处理器
对象检测处理器开箱即用地支持 TensorFlow 对象检测 API。它允许在单幅图像或图像流中实时定位和识别多个物体。对象检测处理器构建在对象检测功能之上。
以下是一些合理的配置默认设置:
-
object.detection.model:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb -
对象.检测.标签:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt -
object.detection.with-masks:false
下图展示了一个Spring云数据流流式流水线,实时预测输入图像流中的对象类型。
处理器的输入是一个图像字节数组,输出是一个增强后的图像和一个称为detected_objects,提供了检测对象的文本描述:
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
这detected_objects头部格式为:
-
object-name:confidence - 检测对象的人类可读名称(如标签),其置信度为 [0-1] 之间的浮点
-
x1, y1, x2, y2 - 响应还提供了检测对象的边界框,表示为
(x1, y1, x2, y2).坐标相对于图像大小表示。 -
CID - 在所提供标签配置文件中定义的分类标识符。
6.8.2. 期权
- object.detection.cache-model
-
<缺少文档>(布尔值,默认:
true) - 对象。检测。信心
-
<缺少文档>(浮点,默认:
0.4) - object.detection.debug-output
-
<缺少文档>(布尔值,默认:
false) - object.detection.debug-output-path
-
<缺少文档>(字符串,默认:
object-detection-result.png) - 对象.检测.标签
-
标签为上呼吸道感染。(字符串,默认:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt) - object.detection.model
-
预训练张量流对象检测模型。(字符串,默认:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb) - object.detection.response-size
-
<缺少文档>(整数,默认:
<没有>) - object.detection.with-masks
-
<缺少文档>(布尔值,默认:
false)
6.9. 语义分割处理器
基于最先进的DeepLab张量流模型的图像语义分割。
这语义分割是将图像中的每个像素与类别标签(如花、人、道路、天空、海洋或汽车)关联起来的过程。
与实例细分,产生实例感知区域掩码,即语义分割产生类感知的掩码。
用于实现实例细分建议咨询对象检测服务。
有效载荷
入门类型为字节[],内容类型为应用/八位元组流.处理器处理输入字节[]图像和输出增强字节[]图像负载和json头部。
处理器的输入是一个图像字节数组,输出是一个增强图像字节数组和一个 JSON 头部semantic_segmentation采用此格式:
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ]
...
]
输出头json格式表示从输入图像计算出的彩色像素映射。
6.9.2. 期权
- 语义.segmentation.color-map-uri
-
每个预训练模型都基于特定的物体颜色映射。预定义的选项包括:- classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json(字符串,默认:
classpath:/colormap/citymap_colormap.json) - semantic.segmentation.debug-output
-
在本地debugOutputPath路径中保存输出图像。(布尔值,默认:
false) - semantic.segmentation.debug-output-path
-
<缺少文档>(字符串,默认:
semantic-segmentation-result.png) - semantic.segmentation.mask-transparency
-
计算出的分割掩码图像的α色。(浮点,默认:
0.45) - semantic.segmentation.model
-
预训练的张量流语义分割模型。(字符串,默认:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb) - semantic.segmentation.output-type
-
指定输出图像类型。你可以返回带有计算出的遮罩覆盖的输入图像,或者只返回遮罩。(OutputType,默认:
<没有>,可能的值:混合,面具)
6.11. 分流处理器
分流器应用基于 Spring Integration 中同名的概念,允许将单条消息拆分成多个独立消息。
处理器使用一个函数,对留言<?>作为输入,然后产生列表<消息<?>作为基于各种性质的输出(见下文)。
你可以用SpEL表达式或分隔符来指定如何拆分收到的消息。
6.11.2. 选项
- splitter.apply-sequence
-
在头部添加相关/序列信息,便于后续聚合。(布尔值,默认:
true) - splitter.charset
-
在将基于文本的文件中字节转换为字符串时使用的字元集。(字符串,默认:
<没有>) - splitter.delimiters
-
当表达式为空时,分隔符用于分区{@link字符串}有效载荷分符。(字符串,默认:
<没有>) - 分流者。表达
-
用于拆分有效载荷的SpEL表达式。(字符串,默认:
<没有>) - splitter.file-markers
-
设置为true或false,以便使用{@code FileSplitter}(按行分割文本文件),该文件包含(或不包含)文件的开头/结尾标记。(布尔值,默认:
<没有>) - Splitter.markers-json
-
当“fileMarkers == true”时,指定它们应生成为FileSplitter.FileMarker对象还是JSON。(布尔值,默认:
true)
6.12. 变换处理器
Transformer 处理器允许你基于 SpEL 表达式转换消息有效载荷结构。
这里有一个运行该应用的示例。
Java -jar filter-processor-kafka-<version>.jar --spel.function.expression=toUpperCase()
如果你想在RabbitMQ上运行Kafka,可以把它改成Rabbit。
6.13. Twitter 趋势和趋势位置处理器
能够返回热门话题或热门话题的位置的处理器。
这推特。趋势。趋势-查询类型属性允许选择查询类型。
6.13.2. 检索趋势位置
对于该模式集合推特。趋势。趋势-查询类型自趋势位置.
按地点获取完整或附近的热门话题列表。
如果纬度,经度只要处理器执行趋势可用API,并返回Twitter拥有热门话题信息的位置,则不包含参数。
如果纬度,经度参数前提是处理器执行趋势最近 API,并返回 Twitter 拥有趋势话题信息且最接近指定位置的位置。
响应是一个数组地点该信息编码了该地点的 WOEID 以及一些人类可读的信息,比如该位置所属的官方名称和国家。
7. 沉没
7.1. 卡桑德拉·辛克
该汇入应用会将收到的每条消息的内容写入Cassandra。
它期望收到 JSON String 的负载,并用其属性映射到表列。
7.1.2. 期权
Cassandra水槽有以下选项:
- spring.data.cassandra.compression
-
压缩由Cassandra二进制协议支持。(压缩,默认:
没有,可能的值:LZ4,活泼,没有) - spring.data.cassandra.config
-
配置文件的位置。(资源,默认:
<没有>) - spring.data.cassandra.contact-points
-
集群节点地址形式为“host:port”,或简单地使用配置好的端口“host”。(List<String>,默认:
[127.0.0.1:9042]) - spring.data.cassandra.keyspace-name
-
使用密钥空间名称。(字符串,默认:
<没有>) - spring.data.cassandra.local-datacenter
-
被视为“本地”的数据中心。联系点应来自该数据中心。(字符串,默认:
<没有>) - spring.data.cassandra.password(春.数据.cassandra.password)
-
服务器登录密码。(字符串,默认:
<没有>) - spring.data.cassandra.port
-
如果某个接触点没有指定端口,可以使用端口。(整数,默认:
9042) - spring.data.cassandra.schema-action
-
启动时需要采取的模式作。(字符串,默认:
没有) - spring.data.cassandra.session-name
-
卡桑德拉的会议名称。(字符串,默认:
<没有>) - 春.数据.cassandra.ssl
-
启用SSL支持。(布尔值,默认:
false) - spring.data.cassandra.username
-
登录服务器用户。(字符串,默认:
<没有>)
7.2. 分析汇
Sink 应用构建在 Analytics Consumer 之上,能够从输入消息中计算分析数据,并将分析数据作为指标发布给各种监控系统。它利用微米库在最流行的监控系统中提供统一的编程体验,并揭示了Spring表达式语言(SpEL)特性,用于定义如何从输入数据计算度量名称、值和标签。
分析汇可以生成两种指标类型:
米(如计数器或规范)由其唯一标识名称和尺寸(尺寸和标签这两个术语可以互换使用。)维度允许对特定命名的指标进行切片,以深入分析和推理数据。
作为度量唯一由其标识的名称和尺寸你可以为每个指标分配多个标签(e.g. key/值对),但不能随意更改这些标签!像Prometheus这样的监控系统如果同名指标使用不同的标签组,就会抱怨。 |
使用该analytics.name或analytics.name-expression属性设置了输出分析指标的名称。如果没有设置,指标名称默认为应用程序名称。
使用该analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>,属性,用于在你的指标中添加一个或多个标签。这TAG_NAME属性定义中使用的指标将作为标签名称出现。TAG_VALUE是SpEL表达式,能够动态计算来自接收消息的标签值。
这SpEL表达式使用头和有效载荷关键词用于访问消息的头部和有效载荷值。
你可以使用文字(例如:“固定价值”)以设置固定值的标签。 |
所有Stream Applications一出厂就支持三种最受欢迎的监控系统,波,普罗 米修斯和InfluxDB你可以声明式地启用它们。
你可以通过在分析汇应用。
请访问春季云数据流监控,获取详细的监控系统配置说明。以下简要内容可以帮助你开始。
-
要启用普罗米修斯电表注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用波前表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 电表注册表,请设置以下属性。
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,那么分析汇将重复使用提供的指标配置。 |
下图展示了如何分析汇帮助收集企业内部信息,用于股票交易所和实时管道。
7.2.2. 选项
按前缀分组的属性:
分析学
- 数量表达
-
一个 SpEL 表达式用于计算输出度量值(例如数量)。默认是1.0(表达式,默认:
<没有>) - 米型
-
用于向后台报告指标的微米类型。(MeterType,默认:
<没有>,可能的值:计数器,轨距) - 名称
-
输出指标的名称。“name”和“nameExpression”是互斥的。只能设置其中一个。(字符串,默认:
<没有>) - 名称表达式
-
一个 SpEL 表达式,用于从输入消息中计算输出度量名称。“name”和“nameExpression”是互斥的。只能设置其中一个。(表达式,默认:
<没有>)
analytics.tag
- 表达
-
通过SpEL表达式计算标签。单个 SpEL 表达式可以产生数组值,这又意味着不同的名称/值标签。每个名字/值标签都会生成一个独立的计量器增量。标签表达格式为:analytics.tag.expression。[标签名]=[SpEL表达式](Map <String, Expression>,默认:
<没有>) - 固定
-
弃用:请使用带有字面SpEL表达式的analytics.tag.expression。自定义,固定标签。这些标签有固定值,创建一次后会随发布的指标一起发送。定义固定标签的惯例是:<code>analytics.tag.fixed。[标签名]=[标签值] </代码>(Map<String, String>,默认:
<没有>)
7.3. 弹性搜索汇
一个能将文档索引到Elasticsearch的Sink。
这个 Elasticsearch sink 只支持索引 JSON 文档。
它从输入目的地获取数据,然后将其索引到 Elasticsearch。
输入数据可以是普通的 JSON 字符串,也可以是java.util.Map代表 JSON。
它也接受Elasticsearch提供的数据XContentBuilder.
不过这种情况很少见,因为中间件不太可能像XContentBuilder.
这主要用于直接调用消费者。
7.3.1. 期权
Elasticsearch 汇具备以下选项:
按前缀分组的属性:
elasticsearch.consumer
- 异步
-
表示索引作是否异步。默认情况下,索引是同步完成的。(布尔值,默认:
false) - 批次大小
-
每个请求需要索引的项目数量。默认是1。对于大于 1 的值,将使用 批量索引 API。(整数,默认:
1) - 小组暂停
-
超时以毫秒计,之后在批量索引激活时会冲洗消息组。默认为 -1,意味着不会自动刷新空闲消息组。(长,默认:
-1) - 身份证
-
文档的索引ID。如果设置为,INDEX_ID头值会在每条消息上覆盖该属性。(表达式,默认:
<没有>) - 指数
-
索引名称。如果设置为,INDEX_NAME头值会在每条消息上覆盖该属性。(字符串,默认:
<没有>) - 路由
-
指示要路由的分片。如果未提供,Elasticsearch 将默认使用文档 ID 的哈希值。(字符串,默认:
<没有>) - 暂停秒
-
分片可用的时间暂停。如果没有设置,默认情况下由 Elasticsearch 客户端设置 1 分钟。(长,默认:
0)
7.3.2. 运行该水槽的示例
-
摘自文件夹
弹性搜索汇:./mvnw 清洁封装 -
CD应用
-
cd 到正确的绑定器生成应用(Kafka 或 RabbitMQ)
-
./mvnw 清洁封装 -
确保你运行着Elasticsearch。例如,你可以用以下命令将其作为 docker 容器运行。
Docker run -d --name es762 -p 9200:9200 -e “discovery.type=single-node” elasticsearch:7.6.2 -
如果中间件(Kafka 或 RabbitMQ)还没运行,就启动它。
-
Java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing -
把一些JSON数据发送到中间件目标。例如:
“foo”:bar“} -
确认数据已被索引:
curl localhost:9200/testing/_search
7.4. 文件汇
文件汇入应用会将它收到的每条消息写入文件。
7.4.2. 选项
这文件汇入有以下选项:
- file.consumer.binary
-
一个标志用于指示是否应在写入后添加换行。(布尔值,默认:
false) - file.consumer.charset
-
写文本内容时使用的字符集。(字符串,默认:
UTF-8) - file.consumer.directory
-
目标文件的父目录。(文件,默认:
<没有>) - file.consumer.directory-expression
-
计算目标文件父目录的表达式。(字符串,默认:
<没有>) - file.consumer.mode
-
如果目标文件已经存在,则使用 FileExistsMode 来使用。(FileExistsMode,默认:
<没有>,可能的值:附加,APPEND_NO_FLUSH,失败,忽视,取代,REPLACE_IF_MODIFIED) - file.consumer.name
-
目标文件的名称。(字符串,默认:
文件消费者) - file.consumer.name-expression
-
用于计算目标文件名称的表达式。(字符串,默认:
<没有>) - file.consumer.suffix
-
文件名后加上后缀。(字符串,默认:
<空字符串>)
7.5. FTP 汇
FTP sink 是一个简单的选项,可以将收到的消息推送文件到 FTP 服务器。
它使用了一个FTP-出站适配器因此,收到的消息可以是java.io.file对象,a字符串(文件内容)
或字节(文件内容也包括)
使用这个水槽需要用户名和密码才能登录。
默认情况下,Spring 集成会使用o.s.i.file.DefaultFileNameGenerator如果没有指定。DefaultFileNameGenerator将决定文件名
基于file_name在消息头,或者如果 的有效载荷消息已经是java.io.file那么它就会
使用该文件的原始名称。 |
7.5.4. 期权
FTP Sink 有以下选项:
按前缀分组的属性:
ftp.consumer
- 自动-创建-导演
-
是否创建远程目录。(布尔值,默认:
true) - 文件名表达式
-
一个用于生成远程文件名的 SpEL 表达式。(字符串,默认:
<没有>) - 模式
-
如果远程文件已经存在,则需要采取相应的作。(FileExistsMode,默认:
<没有>,可能的值:附加,APPEND_NO_FLUSH,失败,忽视,取代,REPLACE_IF_MODIFIED) - 远程指令
-
远程FTP目录。(字符串,默认:
/) - 远程文件分隔器
-
远程文件分离器。(字符串,默认:
/) - 临时-远程-DIR
-
一个临时目录,如果“#isUseTemporaryFilename()”为真,文件将被写入。(字符串,默认:
/) - tmp-file-后缀
-
转移过程中使用的后缀。(字符串,默认:
.tmp) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认:
true)
7.6. 晶洞洼地
晶洞汇表会将消息内容写入晶洞区域。
7.6.1. 期权
晶洞水槽有以下选项:
按前缀分组的属性:
geode.consumer
- JSON
-
指示 Geode 区域是否将 json 对象存储为 PdxInstance。(布尔值,默认:
false) - key-expression
-
SpEL 表达式用作缓存键。(字符串,默认:
<没有>)
晶洞池
- 连接型
-
指定连接类型:“服务器”或“定位器”。(ConnectType,默认:
<没有>,可能的值:定位,服务器) - 主机地址
-
指定一个或多个 Gemfire 定位器或服务器地址,格式为 [host]:[port]。(InetSocketAddress[],默认:
<没有>) - 支持订阅
-
设置为 true,以启用客户端池的订阅。需要同步更新到客户端缓存。(布尔值,默认:
false)
geode.security.ssl
- 密码
-
将用于安全套接字连接的SSL密码配置为有效密码名称的数组。(字符串,默认:
任何) - keystore类型
-
识别用于SSL通信的密钥存储类型(如JKS、PKCS11等)。(字符串,默认:
开玩笑) - Keystore-URI
-
用于连接晶洞集群的预创建密钥存储URI的位置。(资源,默认:
<没有>) - SSL-keystore-password
-
访问密钥的密码是Truststore。(字符串,默认:
<没有>) - SSL-Truststore-password
-
访问信托存储的密码。(字符串,默认:
<没有>) - TrustStore类型
-
识别用于SSL通信的信任存储类型(例如JKS、PKCS11等)。(字符串,默认:
开玩笑) - Truststore-URI
-
用于连接Geode集群的预创建truststore库URI的位置。(资源,默认:
<没有>) - 用户主目录
-
本地目录用于缓存从 truststoreUri 和 keystoreUri 位置下载的 truststore 和 keystore 文件。(字符串,默认:
用户.home)
7.7. JDBC汇陷
JDBC sink 允许你将进入的有效载荷持久化到关系数据库数据库中。
这jdbc.consumer.columns性质表示COLUMN_NAME[:EXPRESSION_FOR_VALUE]哪里EXPRESSION_FOR_VALUE(与冒号一起)是可选的。此时,值通过生成的表达式计算,如有效载荷。 COLUMN_NAME这样我们就能直接映射对象属性到表列。例如,我们有一个像这样的 JSON 负载:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
所以,我们可以将其代入到表中,且名称,城市和街结构基于以下构型:
--jdbc.consumer.columns=name,city:address.city,street:address.street
该汇入支持批处理插入,前提是底层JDBC驱动支持。批处理插入通过批次大小和闲置超时性能: 收到的消息会被聚合到批次大小消息存在后,作为批次插入。 如果闲置超时毫秒过去后没有新消息,即使聚合批次小于批次大小,限制最大延迟。
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此有以下属性spring.datasource.url 等等,申请。 |
7.7.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.7.2. 期权
jdbc 汇具备以下选项:
按前缀分组的属性:
jdbc.consumer
- 批次大小
-
数据将被刷新到数据库表时,消息数量的阈值。(整数,默认:
1) - 列
-
逗号区分了基于冒号的列名对和 SpEL 表达式,用于插入/更新值。名称在初始化时用于发出 DDL。(字符串,默认:
payload:payload.toString()) - 闲置超时
-
当数据自动冲入数据库表时,空闲超时需要毫秒。(长,默认:
-1) - 初始化
-
“true”、“false”或表自定义初始化脚本的位置。(字符串,默认:
false) - 表名
-
写入的表格名称。(字符串,默认:
消息)
spring.datasource
- 数据
-
数据(DML)脚本资源引用。(List<String>,默认:
<没有>) - 驾驶员类别名称
-
JDBCDrivers的完全合格姓名。默认根据网址自动检测。(字符串,默认:
<没有>) - 初始化模式
-
在确定是否应使用可用的DDL和DML脚本执行DataSource初始化时,该模式适用。(DataSourceInitializationMode,默认:
嵌入式,可能的值:总是,嵌入式,从不) - 密码
-
数据库的登录密码。(字符串,默认:
<没有>) - 图式
-
模式(DDL)脚本资源引用。(List<String>,默认:
<没有>) - 网址
-
数据库的JDBC网址。(字符串,默认:
<没有>) - 用户名
-
数据库用户名登录。(字符串,默认:
<没有>)
7.8. 木头水槽
这日志Sink 使用应用日志器输出数据进行检查。
请理解这一点日志Sink 使用无类型处理程序,这会影响实际日志的执行方式。这意味着如果内容类型是文本,则原始有效载荷字节会转换为字符串,否则原始字节会被记录。更多信息请参见用户指南。
7.9. MongoDB 汇入
该汇应用将输入数据导入MongoDB。该应用完全基于MongoDataAutoConfiguration,更多信息请参阅 Spring Boot MongoDB 支持。
7.9.2. 期权
Mongodb水槽有以下选项:
按前缀分组的属性:
spring.data.mongodb
- 认证数据库
-
认证数据库名称。(字符串,默认:
<没有>) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认:
<没有>) - 数据库
-
数据库名称。(字符串,默认:
<没有>) - 场命名策略
-
字段命名策略的完全限定名称。(职业<?>,默认:
<没有>) - 网格-FS-数据库
-
<缺少文档>(字符串,默认:
<没有>) - 主机
-
Mongo服务器主机。无法用URI设置。(字符串,默认:
<没有>) - 密码
-
Mongo服务器的登录密码。无法用URI设置。(字符[],默认:
<没有>) - 端口
-
Mongo服务器端口。无法用URI设置。(整数,默认:
<没有>) - 复制集名称
-
集群需要副本集名称。无法用URI设置。(字符串,默认:
<没有>) - 乌里
-
Mongo 数据库 URI。不能设置主机、端口、凭据和副本集名称。(字符串,默认:
mongodb://localhost/test) - 用户名
-
登录Mongo服务器的用户。无法用URI设置。(字符串,默认:
<没有>) - UUID 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示方式。(UuidRepresentation,默认:
Java-遗产,可能的值:未具体说明,标准,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
7.10. MQTT 汇入
该模块向MQTT发送消息。
7.11. Pgcopy Shang
一个模块,通过 PostgreSQL COPY 命令将其收到的有效载荷写入 RDBMS。
7.11.3. 期权
jdbc 汇具备以下选项:
- Spring.datasource.driver-class-name
-
JDBCDrivers的完全合格姓名。默认根据网址自动检测。(字符串,默认:
<没有>) - spring.datasource.password(春.datasource.password)
-
数据库的登录密码。(字符串,默认:
<没有>) - spring.datasource.url
-
数据库的JDBC网址。(字符串,默认:
<没有>) - spring.datasource.username
-
数据库用户名登录。(字符串,默认:
<没有>)
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此有以下属性spring.datasource.url 等等,申请。 |
7.12. RabbitMQ 沉没
该模块向RabbitMQ发送消息。
7.12.1. 期权
兔子水槽有以下选项:
(有关RabbitMQ连接属性,请参见Spring Boot文档)
按前缀分组的属性:
兔
- 转换器豆名
-
自定义消息转换器的简称;如果省略,则使用 SimpleMessageConverter。如果是“jsonConverter”,就会为你创建一个Jackson2JsonMessageConverter豆。(字符串,默认:
<没有>) - 交换
-
交换名称——如果提供了,则被exchangeNameExpression覆盖。(字符串,默认:
<空字符串>) - 交换表达式
-
一个评估交换名称的SpEL表达式。(表达式,默认:
<没有>) - 头部映射最后
-
在映射出站消息的头部时,确定头部是在消息转换前映射,还是转换后。(布尔值,默认:
true) - 映射请求头
-
这些头部将被映射。(字符串[],默认:
[*]) - 自有连接
-
如果是真的,根据启动属性使用单独的连接。(布尔值,默认:
false) - 持久传递模式
-
当没有“amqp_deliveryMode”头部时,默认传递模式,对于持久性(PERSISTENT)成立。(布尔值,默认:
false) - 路由密钥
-
路由密钥——如果提供了,将被 routingKeyExpression 覆盖。(字符串,默认:
<没有>) - 路由密钥表达式
-
一个计算到路由键的 SpEL 表达式。(表达式,默认:
<没有>)
Spring.rabbitmq
- 地址洗牌模式
-
用于洗牌配置地址的模式。(AddressShuffleMode,默认:
没有,可能的值:没有,随机,顺序) - 地址
-
客户端应连接的地址列表,以逗号分隔。设置好后,主机和端口被忽略。(字符串,默认:
<没有>) - 通道-RPC超时
-
通道中RPC呼叫的续续超时。把它设为零,等待很久。(持续时间,默认:
10米) - 连接超时
-
连接超时。把它设为零,等待很久。(持续时间,默认:
<没有>) - 主机
-
RabbitMQ主机。如果地址被设置了,则忽略。(字符串,默认:
本地主持) - 密码
-
登录以验证经纪人身份。(字符串,默认:
客人) - 端口
-
RabbitMQ移植版。如果地址被设置了,则忽略。默认使用5672,启用SSL则使用5671。(整数,默认:
<没有>) - 出版者-确认-类型
-
出版商类型确认使用。(ConfirmType,默认:
<没有>,可能的值:简单,相关,没有) - 出版商回归
-
是否启用出版商退货。(布尔值,默认:
false) - 请求通道最大值
-
客户端请求的每个连接的频道数量。无限用0。(整数,默认:
2047) - 请求心跳
-
请求心跳暂停;零对零。如果未指定持续时间后缀,则使用秒。(持续时间,默认:
<没有>) - 用户名
-
登录用户以向经纪人进行身份验证。(字符串,默认:
客人) - 虚拟主机
-
连接经纪商时使用的虚拟主机。(字符串,默认:
<没有>)
7.13. Redis沉没
给Redis发消息。
7.13.1. 期权
Redis 水槽有以下选项:
按前缀分组的属性:
redis.consumer
- 钥匙
-
存储到密钥时使用的字面键名。(字符串,默认:
<没有>) - key-expression
-
一个用于存储到键的 SpEL 表达式。(字符串,默认:
<没有>) - 队列
-
存储队列时的字面队列名称。(字符串,默认:
<没有>) - 队列表达式
-
一个用于队列的 SpEL 表达式。(字符串,默认:
<没有>) - 主题
-
一个用于发布主题时使用的字面主题名称。(字符串,默认:
<没有>) - 主题表达
-
一个用于主题的特殊英语表达式。(字符串,默认:
<没有>)
Spring.redis。
- 客户名
-
客户端名称应与客户端 SETNAME 连接时设置。(字符串,默认:
<没有>) - 客户端类型
-
使用客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认:
<没有>,可能的值:生菜,杰迪斯) - 连接超时
-
连接超时。(持续时间,默认:
<没有>) - 数据库
-
连接工厂使用的数据库索引。(整数,默认:
0) - 主机
-
Redis服务器主机。(字符串,默认:
本地主持) - 密码
-
Redis 服务器的登录密码。(字符串,默认:
<没有>) - 端口
-
Redis服务器端口。(整数,默认:
6379) - SSL(高级SSL)
-
是否启用SSL支持。(布尔值,默认:
false) - 超时
-
读“暂停”。(持续时间,默认:
<没有>) - 网址
-
连接网址。覆盖主机、端口和密码。用户被忽视了。示例:redis://user:[email protected]:6379(字符串,默认:
<没有>) - 用户名
-
登录Redis服务器的用户名。(字符串,默认:
<没有>)
spring.redis.jedis.pool(Spring)。redis.jedis.pool(Spring)。
- 最大主动
-
池在特定时间内可分配的最大连接数。使用负值表示无限制。(整数,默认:
8) - 最大怠速
-
池中“空闲”连接的最大数量。使用负数表示空闲连接数量无限。(整数,默认:
8) - 等一下
-
连接分配应在池用尽后才触发异常的最长阻塞时间。用负值无限阻挡。(持续时间,默认:
-1毫秒) - 最小怠速
-
目标是保持在池中最少的空闲连接数。这个设置只有在它和驱逐间隔时间都为正值时才有效。(整数,默认:
0) - 驱逐间隔时间
-
闲置对象Evictor线程运行间隔时间。当值为正时,空闲对象 Evictor 线程开始,否则不会执行空闲对象驱逐。(持续时间,默认:
<没有>)
spring.redis.letuce.pool(Spring.redis.生菜.pool)
- 最大主动
-
池在特定时间内可分配的最大连接数。使用负值表示无限制。(整数,默认:
8) - 最大怠速
-
池中“空闲”连接的最大数量。使用负数表示空闲连接数量无限。(整数,默认:
8) - 等一下
-
连接分配应在池用尽后才触发异常的最长阻塞时间。用负值无限阻挡。(持续时间,默认:
-1毫秒) - 最小怠速
-
目标是保持在池中最少的空闲连接数。这个设置只有在它和驱逐间隔时间都为正值时才有效。(整数,默认:
0) - 驱逐间隔时间
-
闲置对象Evictor线程运行间隔时间。当值为正时,空闲对象 Evictor 线程开始,否则不会执行空闲对象驱逐。(持续时间,默认:
<没有>)
7.14. 路由水槽
该应用程序将消息路由到指定信道。
7.14.1. 选项
路由器水槽有以下选项:
- router.default-output-channel
-
发送无法路由的消息。(字符串,默认:
零通道) - router.destination-mappings
-
目的地映射作为新行分隔串的名称-值对,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - router.expression
-
将应用到消息上的表达式,以确定要路由到哪个信道。注意,文本、json 或 xml 等内容类型的有效载荷线格式是字节[],而非字符串!请参阅如何处理字节数组有效载荷内容的文档。(表达式,默认:
<没有>) - router.refresh-delay
-
如果有,MS中多久检查一次脚本变动;<0表示不要刷新。(整数,默认:
60000) - router.resolution-required(需要)
-
是否需要通道分辨率。(布尔值,默认:
false) - router.script
-
返回通道或通道映射分辨率键的groovy脚本位置。(资源,默认:
<没有>) - router.variables
-
变量绑定作为新行分隔串的名称-值对,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - router.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认:
<没有>)
由于这是一个动态路由器,目的地会根据需要创建;因此,默认情况下默认输出通道和需要分辨率仅在粘结 剂绑定目标时遇到了一些问题。 |
你可以用spring.cloud.stream.dynamic目的地财产。
默认情况下,所有已解析的目的地都会被动态绑定;如果该性质具有逗号分隔列表
目的地名称,只有那些名称会被绑定。
如果消息最终解析到不在该列表中的目的地,则会被路由到默认输出通道哪
也必须出现在列表中。
目的地地图用于将评估结果映射到实际的目的地名称。
7.14.2. 基于特殊语言的路由
该表达式会根据消息进行评估,返回信道名称或信道名称映射的密钥。
欲了解更多信息,请参阅春季版中的“路由器与Spring表达式语言(SpEL)”子章节 集成参考手册:配置通用路由器部分。
从Spring Cloud Stream 2.0开始,消息线格式为JSON,文本和XML内容类型为字节[]不字符串!
这是与SCSt 1.x相比的改变,后者将这些类型视为字符串!
这取决于内容类型,处理方法不同字节[]有效载荷是可用的。对于普通文本内容类型,可以将八位元组有效载荷转换为字符串,使用新字符串(有效载荷)SpEL表达。为JSON类型化 jsonPath() SpEL 工具
已经可以互换支持字符串和字节数组内容。同样适用于XML内容类型和 #xpath() SpEL 工具。 |
例如,当文本应使用以下内容类型:
new String(payload).contains('a')
以及JSON内容类型 SpEL 表达式如下:
#jsonPath(payload, '$.person.name')
7.14.3. 基于Groovy的路由
除了 SpEL 表达式,也可以使用 Groovy 脚本。我们在文件系统中创建一个 Groovy 脚本,在 “file:/my/path/router.groovy”,或“classpath:/my/path/router.groovy”:
println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果你想把变量值传递给脚本,可以用变量选项静态绑定值,或者 可选地,可以通过propertiesLocation选项,将路径传递到包含绑定的properties文件。 文件中的所有属性都将作为变量向脚本开放。你可以同时指定变量和 propertiesLocation,在这种情况下,任何作为变量提供的重复值都会覆盖 propertiesLocation 中的值。 注意,payload 和 header 是隐式绑定的,可以让你访问消息中包含的数据。
更多信息请参见 Spring Integration 参考手册《Groovy Support》。
7.16. 亚马逊S3沉没
这个汇入应用支持将对象转移到亚马逊S3桶。
文件、有效载荷(以及递归的目录)会被传输到远程目录(S3桶)映射到当地应用程序部署的目录。
该汇入接收的消息必须包含以下有效载荷如:
-
文件,包括递归上传目录; -
输入流; -
字节[]
7.16.1. 期权
S3水槽有以下选项:
按前缀分组的属性:
s3.consumer
- 前交叉口
-
S3 对象访问控制列表。(CannedAccessControlList,默认:
<没有>,可能的值:私人,公开阅读,公网-读-写,认证读取,日志-交付-写入,桶所有者-读取,桶所有者-完全控制,aws-exec-read) - ACL表达式
-
用表达式来评估S3对象访问控制列表。(表达式,默认:
<没有>) - 桶
-
用于存储目标文件的 AWS 桶。(字符串,默认:
<没有>) - 桶表达式
-
用表达式来评估AWS的桶名称。(表达式,默认:
<没有>) - key-expression
-
用于评估S3对象键的表达式。(表达式,默认:
<没有>)
目标生成的应用程序基于AmazonS3SinkConfiguration可以通过以下方式增强S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener(进步听众),这些 被注入到S3MessageHandler豆。
详情请参见 Spring 集成 AWS 支持。
7.16.2. 亚马逊AWS常见选项
Amazon S3 Sink(以及所有其他 Amazon AWS 应用)基于 Spring Cloud AWS 项目作为基础,并实现了自动配置 Spring Boot 会自动使用职业。 请参阅他们关于必需且有用的自动配置属性的文档。
其中一些是关于AWS凭证的:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他的则是AWS的地区定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
还有 AWS叠:
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
7.17. SFTP沉没
SFTP sink 是一个简单的选项,可以将收到的消息推送文件到 SFTP 服务器。
它使用了一个SFTP-出站适配器因此,收到的消息可以是java.io.file对象,a字符串(文件内容)
或字节(文件内容也包括)
使用这个水槽需要用户名和密码才能登录。
默认情况下,Spring 集成会使用o.s.i.file.DefaultFileNameGenerator如果没有指定。DefaultFileNameGenerator将决定文件名
基于file_name在消息头,或者如果 的有效载荷消息已经是java.io.file那么它就会
使用该文件的原始名称。 |
在配置sftp.factory.known-hosts-expressionoption,评估的根对象是应用上下文,例如:sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.
7.17.3. 期权
sftp 水槽有以下选项:
按前缀分组的属性:
SFTP.Consumer
- 自动-创建-导演
-
是否创建远程目录。(布尔值,默认:
true) - 文件名表达式
-
一个用于生成远程文件名的 SpEL 表达式。(字符串,默认:
<没有>) - 模式
-
如果远程文件已经存在,则需要采取相应的作。(FileExistsMode,默认:
<没有>,可能的值:附加,APPEND_NO_FLUSH,失败,忽视,取代,REPLACE_IF_MODIFIED) - 远程指令
-
远程FTP目录。(字符串,默认:
/) - 远程文件分隔器
-
远程文件分离器。(字符串,默认:
/) - 临时-远程-DIR
-
一个临时目录,如果“isUseTemporaryFilename()”为真,文件将被写入。(字符串,默认:
/) - tmp-file-后缀
-
转移过程中使用的后缀。(字符串,默认:
.tmp) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认:
true)
7.18. TCP 吸收
该模块通过编码器向TCP写入消息。
TCP是一种流式传输协议,需要某种机制来在线路上对消息进行帧。有若干编码器 可用,默认是“CRLF”。
7.20。推特消息汇
从认证用户向指定用户发送直接消息。
需要一个 JSON POST 正文,并且内容类型将 首部设置为application/json.
| 当用户收到消息时,在24小时内最多可发送5条回复。 每收到一条消息,都会重置24小时窗口和分配的5条消息。 在24小时窗口内发送第6条消息或在24小时窗口外发送消息将计入速率限制。 这种行为只在使用 POST direct_messages/事件/新端点时出现。 |
SpEL表达式用于计算输入消息的请求参数。
7.21. 推特更新汇报
更新认证用户当前的文本(例如推文)。
| 每次更新尝试时,更新文本都会与认证用户最近的推文进行比较。 任何可能导致重复的尝试都会被阻止,导致403错误。 用户不能连续提交相同的文本两次。 |
虽然API没有速率限制,但用户一次能创建的推文数量是有限的。 标准API的更新限制是3小时内300次。 如果用户发布的更新次数达到当前允许的限制,该方法将返回HTTP 403错误。
你可以在这里找到更新API的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 期权
按前缀分组的属性:
推特更新
- 附件-URL
-
(特殊英语表达)为了避免某个URL被计入扩展推文的正文,请提供一个URL作为推文附件。该URL必须是推文永久链接,或直接消息深度链接。文本文本中必须保留任意的非Twitter网址。传递给attachment_url参数且与推文永久链接或私信深度链接不匹配的URL将在推文创建时失败并触发异常。(表达式,默认:
<没有>) - 显示坐标
-
(特殊英语表达)是否要标记一条推文发出的具体坐标。(表达式,默认:
<没有>) - in-reply-to-status-id
-
(特殊英语表达)更新时回复的文本ID是现有的。注意:除非该参数引用推文的作者在正文中被提及,否则该参数将被忽略。因此,您必须在更新中包含@username,其中用户名是该推文的作者。当 inReplyToStatusID 被设置时,auto_populate_reply_metadata也会自动被设置。之后确保@mentions的引导推文会从原始推文中被查找,并从这里添加到新推文中。随着回复链的增长,@mentions会附加到推文的元数据中,直到@mentions的上限被提升。如果原始推文已被删除,回复将失败。(表达式,默认:
<没有>) - 媒体标识
-
(特殊英语表达)一份逗号分隔的media_ids列表,适合与推文关联。你最多可以在一条推文中包含4张照片、1个动画GIF或1个视频。有关上传媒体的更多细节,请参见“上传媒体”。(表达式,默认:
<没有>) - 地点标识
-
(特殊英语表达)世界上的一席之地。(表达式,默认:
<没有>) - 文本
-
(特殊英语表达)文本更新。根据需要进行URL编码。t.co 链接包裹会影响字符数。默认消息的有效载荷(表达式,默认:
有效载荷)
7.22. 波前汇陷
Wavefront 汇接收消息 <?>,将其转换为 Wavefront 数据格式的指标,并直接发送给 Wavefront 或 Wavefront 代理。
支持常见的ETL用例,即需要清理、转换现有(历史)指标数据并存储在Wavefront中以便进一步分析。
7.22.1. 选项
Wavefront洗手台有以下选项:
- wavefront.api-token(Wavefront.API-token)
-
Wavefront API 访问Tokens。(字符串,默认:
<没有>) - 波前。度量表达式
-
一个对度量值进行评估的SpEL表达式。(表达式,默认:
<没有>) - wavefront.metric-name
-
指标名称。默认应用名称。(字符串,默认:
<没有>) - Wavefront.proxy-uri
-
Wavefront代理的网址。(字符串,默认:
<没有>) - 波前。来源
-
唯一的应用程序、主机、容器或实例,能够发出指标。(字符串,默认:
<没有>) - wavefront.tag-expression
-
与度量相关的自定义元数据集合。点标签不能为空。键的有效字符包括:字母数字、连字符('-')、下划线('_')、点('.')。对于值,允许使用任何字符,包括空格。要包含双引号,请用反斜线转义。反斜线不能是标签值的最后一个字符。点标签键和值组合的最大允许长度为254个字符(包括分隔键和值的'='为255个字符)。如果值更长,则将该点拒绝并记录(Map <String, Expression>,默认:
<没有>) - 波前。时间戳表达式
-
一个 SpEL 表达式,可以对度量的时间戳进行评估(可选)。(表达式,默认:
<没有>) - 波前线.uri
-
Wavefront环境的网址。(字符串,默认:
<没有>)
7.23. Websocket Sink
一个简单的Websocket Sink实现。
7.23.1. 期权
支持以下选项:
- websocket.consumer.log级
-
netty 通道的 logLevel 。默认为 <tt>WARN</tt>(字符串,默认:
<没有>) - websocket.consumer.path
-
WebsocketSink 用户需要连接的路径。默认为 <tt>/websocket</tt>(字符串,默认:
/websocket) - websocket.consumer.port
-
Netty 服务器监听的端口。默认为 <tt>9292</tt>(整数,默认:
9292) - Websocket.consumer.ssl
-
是否创建{@link io.netty.handler.ssl.SslContext}。(布尔值,默认:
false) - websocket.consumer.threads
-
Netty {@link io.netty.channel.EventLoopGroup} 的线程数。默认为 <tt>1</tt>(整数,默认:
1)
7.23.2. 示例
为了验证websocket-sink是否接收来自其他Spring-Cloud-Stream应用的消息,你可以使用以下简单的端到端设置。
步骤3:部署Websocket-sink
最后启动一个websocket-sink跟踪模式,这样你就能看到由时间源日志中:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
你应该会在你启动WebsocketSink的控制台里看到日志消息,内容如下:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
有一个端点你可以用它来访问最后一个n发送和接收的消息。你必须通过提供来实现--endpoints.websocketconsumertrace.enabled=true. 默认情况下,它通过host:port/websocketConsumertrace. 以下是示例输出:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的HTML页面,你可以在文本区域看到转发的消息。你可以直接访问通过以下方式host:port在你的浏览器里。
7.24. ZeroMQ 沉没
“zeromq”接收器使消息能够发送到ZeroMQ套接字。
7.24.3. 选项
zeromq 吸收具有以下选项:
- zeromq.consumer.connect-url
-
连接 ZeroMQ 套接字的连接URL。(字符串,默认:
<没有>) - zeromq.consumer.socket-type
-
连接应该建立的套筒类型。(SocketType,默认:
<没有>,可能的值:双,酒馆,子,要求,代表,贩子,路由器,拉,推,XPUB,XSUB,流) - zeromq.consumer.topic
-
一个主题 SpEL 表达式,用于在向订阅者发送消息前评估主题。(表达式,默认:
<没有>)