应用程序

5. 源码

5.1. Debezium 源

Debezium Engine based Change Data Capture (CDC) source. The Debezium Source allows capturing database change events and streaming those over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.spring-doc.cadn.net.cn

该源代码可以与任何 Spring Cloud Stream 消息绑定器一起使用。 不受或不依赖于 Kafka Connect 框架。虽然这种方法灵活,但也会带来某些限制

所有 Debezium 配置属性都受支持。 只需在任何 Debezium 属性前加上 debezium.properties. 前缀。 例如,要设置 Debezium 的 connector.class 属性,请使用 debezium.properties.connector.class 源属性代替。spring-doc.cadn.net.cn

5.1.1. 数据库支持

The Debezium Source currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.spring-doc.cadn.net.cn

5.1.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

Debezium
debezium-native-configuration

<文档缺失> (属性, 默认值: <none>)spring-doc.cadn.net.cn

header-format

{@link ChangeEvent} 头格式。默认为 'JSON'。(DebeziumFormat,默认值:<none>,可选值:JSONAVROPROTOBUF)spring-doc.cadn.net.cn

offset-commit-policy

定义何时应将偏移量提交到偏移存储的策略。(DebeziumOffsetCommitPolicy,默认值:<none>,可能的取值:ALWAYSPERIODICDEFAULT)spring-doc.cadn.net.cn

payload-format

{@link ChangeEvent} 键和负载格式。默认为 'JSON'。 (DebeziumFormat,缺省值: <none>,可能的取值: JSON,AVRO,PROTOBUF)spring-doc.cadn.net.cn

属性

Spring 对Debezium配置属性的传递包装器。所有以'debezium.properties.*'为前缀的属性都是原始的Debezium属性。(Map<String, String>, 默认值: <none>)spring-doc.cadn.net.cn

debezium.supplier
copy-headers

将更改事件标头复制到消息标头。(布尔值,默认:truespring-doc.cadn.net.cn

事件平铺配置

Debezium 提供了一种全面的消息格式,可以准确地详细说明系统中发生的变更信息。 有时候这种格式可能并不适合下游消费者,这些消费者可能需要消息格式简化,使得字段名称和值以 flattened 结构呈现。spring-doc.cadn.net.cn

为简化Debezium 连接器产生的事件记录的格式,您可以使用 Debezium 事件扁平化 消息转换。 使用 扁平化配置,您可以配置简单的消息格式,例如:spring-doc.cadn.net.cn

--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 偏移量存储

When a Debezium source runs, it reads information from the source and periodically records offsets that define how much of that information it has processed. Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading. Out of the box, the following offset storage configuration options are provided:spring-doc.cadn.net.cn

  • In-Memoryspring-doc.cadn.net.cn

    Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
  • 本地文件系统spring-doc.cadn.net.cn

    Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
    --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1)
    --debezium.properties.offset.flush.interval.ms=60000 (2)
    1 用于存储偏移量的文件路径。当offset.storage`设置为FileOffsetBackingStore时必需。
    2 间隔多久尝试提交偏移量。默认是1分钟。
  • Kafka 主题spring-doc.cadn.net.cn

    Uses a Kafka topic to store offset data.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
    --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1)
    --debezium.properties.offset.storage.partitions=2 (2)
    --debezium.properties.offset.storage.replication.factor=1 (3)
    --debezium.properties.offset.flush.interval.ms=60000 (4)
    1 用于存储偏移量的Kafka主题的名称。当offset.storage被设置为KafkaOffsetBackingStore时必需。
    2 在创建偏移存储主题时使用的分区数量。
    3 复制因子用于在创建偏移量存储主题时使用。
    4 间隔多久尝试提交偏移量。默认是1分钟。

一个可以实现org.apache.kafka.connect.storage.OffsetBackingStore接口以提供与自定义后端键值存储绑定的偏移存储。spring-doc.cadn.net.cn

5.1.3. 示例和测试

Debezium 集成测试使用本地机器上的数据库固定数据。借助 Testcontainers 使用预构建的Debezium Docker 数据库镜像。spring-doc.cadn.net.cn

要在您的 IDE 中运行和调试测试,需要从命令行部署所需的数据库映像。下面的说明解释了如何从 Docker 映像运行预配置的测试数据库。spring-doc.cadn.net.cn

MySQL

在 docker 中启动 debezium/example-mysqlspring-doc.cadn.net.cn

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选) 使用 mysql 客户端连接到数据库并创建具有所需凭据的 debezium 个用户:
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

使用以下属性连接 Debezium 源到 MySQL 数据库:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)

debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)


debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)

debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)

debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 配置 Debezium 源使用 MySqlConnector
2 用于识别和分派传入事件的元数据。
3 连接到运行在localhost:3306上的MySQL服务器,作为debezium用户。
4 包含在 Change Event Value 模式中的 ChangeEvent 消息。
5 启用 事件扁平化
6 源状态在多次启动之间进行保留。

您也可以使用此mysql配置运行DebeziumDatabasesIntegrationTest#mysql()spring-doc.cadn.net.cn

禁用 mysql GenericContainer 测试初始化代码。
PostgreSQL

debezium/example-postgres:1.0 Docker镜像启动一个预配置的postgres服务器:spring-doc.cadn.net.cn

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final

您可以像这样连接到此服务器:spring-doc.cadn.net.cn

psql -U postgres -h localhost -p 5432

使用以下属性连接 Debezium 源到 PostgreSQL:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=postgres  (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 配置 Debezium Source 以使用 PostgresConnector
2 配置 Debezium 发引用来使用 memory 存储。
3 用于识别和分派传入事件的元数据。
4 连接到运行在localhost:5432上的PostgreSQL服务器,作为postgres用户。
5 包含在消息中的Change Event Value模式
6 启用 事件扁平化

你也可以使用此 postgres 配置运行 DebeziumDatabasesIntegrationTest#postgres()spring-doc.cadn.net.cn

禁用 postgres 通用容器测试初始化代码。
MongoDB

debezium/example-mongodb:2.3.3.Final 容器镜像启动一个预配置的 mongodb:spring-doc.cadn.net.cn

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:2.3.3.Final

初始化库存集合spring-doc.cadn.net.cn

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

mongodb 终端输出中,搜索类似于 host: "3f95a8a6516e:27017" 的日志条目:spring-doc.cadn.net.cn

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

在你的/etc/hosts中添加127.0.0.1 3f95a8a6516e条目spring-doc.cadn.net.cn

使用以下属性连接 Debezium 源到 MongoDB:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)

debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)

debezium.properties.tasks.max=1 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 配置 Debezium Source 以使用 MongoDB Connector
2 配置 Debezium 发行引擎使用 memory
3 连接到运行在localhost:27017上的MongoDB,作为debezium用户。
4 debezium.io/docs/connectors/mongodb/#tasks
5 SourceRecord事件中包含更改事件值模式。
6 Enables the Chnage Event Flattening.

你也可以使用此mongodb配置运行DebeziumDatabasesIntegrationTest#mongodb()spring-doc.cadn.net.cn

SQL Server

debezium/example-postgres:1.0 Docker 镜像启动一个 sqlserverspring-doc.cadn.net.cn

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

包含来自 debezium SqlServer 教程的示例数据 Populate:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

使用以下属性连接 Debezium 源到 SQLServer:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=sa  (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 配置 Debezium Source 以使用 SqlServerConnector
2 配置 Debezium 发引用来使用 memory 状态存储。
3 用于识别和分派传入事件的元数据。
4 连接到在localhost:1433上运行的SQL Server,作为sa用户。

你可以使用这种 SQL Server 配置来运行 also 的 0。spring-doc.cadn.net.cn

Oracle

启动Oracle并从localhost访问,按照Debezium Vagrant设置中描述的配置、用户和授权进行设置spring-doc.cadn.net.cn

包含来自 Debezium Oracle 示例教程的示例数据的表单:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. 文件源

此应用程序轮询一个目录,并将新文件或其内容发送到输出通道。<br/>默认情况下,文件源会提供文件的内容作为字节数组。<br/>但是,可以使用 --file.supplier.mode 选项进行自定义:spring-doc.cadn.net.cn

在使用 --file.supplier.mode=lines 时,还可以提供附加选项 --file.supplier.withMarkers=true。如果将其设置为 true,则底层的 FileSplitter 在实际数据之前和之后会发出额外的文件开始和结束标记消息。这两个额外标记消息的有效载荷类型为 FileSplitter.FileMarker。如果未明确设置,则 withMarkers 选项默认为 false。spring-doc.cadn.net.cn

5.2.1. 选项

文件源具有以下选项:filespring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

file.supplier
delay-when-empty

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

目录

要轮询新文件的目录。(文件,默认:<none>)spring-doc.cadn.net.cn

filename-pattern

一个简单的通配符模式,用于匹配文件。 (字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于匹配文件的正则表达式模式。(Pattern,默认值:<none>)spring-doc.cadn.net.cn

prevent-duplicates

设置为 true 可包含一个 AcceptOnceFileListFilter,防止重复。(布尔值,默认:true)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指定要配置的元数据存储类型(默认为“内存”)。若要使用持久化存储,必须包含相应的 Spring Integration 依赖。(StoreType,默认:<none>,可能值:mongodbredisdynamodbjdbczookeeperhazelcastmemory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

5.3. FTP源

此源应用程序支持使用FTP协议传输文件。 文件从remote目录传输到部署应用的local目录中。 默认情况下,源发出的消息以字节数组形式提供。但是,可以使用--mode选项进行自定义:spring-doc.cadn.net.cn

使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarkerwithMarkers 的默认值是 false,除非明确设置。spring-doc.cadn.net.cn

另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
spring-doc.cadn.net.cn

5.3.1. 输入

N/A(从FTP服务器获取文件)。spring-doc.cadn.net.cn

5.3.2. 输出

模式 = 内容
标题:
负载:

用文件内容填充的 byte[]spring-doc.cadn.net.cn

模式 = 行
标题:
负载:

每行一个Stringspring-doc.cadn.net.cn

第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json属性确定。spring-doc.cadn.net.cn

模式 = 参考
标题:
负载:

一个 java.io.File 对象。spring-doc.cadn.net.cn

5.3.3. 选项

ftp源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

ftp.factory
cache-sessions

缓存会话。 (布尔值,默认:<none>spring-doc.cadn.net.cn

client-mode

FTP会话使用的客户端模式。(ClientMode,默认:<none>,可选值:ACTIVEPASSIVE)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:21)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

ftp.supplier
auto-create-local-dir

如果不存在,则设置为 true 以创建本地目录。(Boolean,默认:true)spring-doc.cadn.net.cn

delay-when-empty

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

delete-remote-files

设置为true表示在成功传输后删除远程文件。(布尔类型,默认:false)spring-doc.cadn.net.cn

filename-pattern

一个过滤器模式,用于匹配要传输的文件名。(字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于匹配要传输的文件名的过滤正则表达式模式。(Pattern,默认值:<none>)spring-doc.cadn.net.cn

local-dir

用于文件传输的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

preserve-timestamp

设置为 true 可保留原始时间戳。(布尔类型,默认值:true)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指定要配置的元数据存储类型(默认为“内存”)。若要使用持久化存储,必须包含相应的 Spring Integration 依赖。(StoreType,默认:<none>,可能值:mongodbredisdynamodbjdbczookeeperhazelcastmemory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

5.3.4. 示例

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. HTTP 源

一个监听HTTP请求并将其主体作为消息负载发出的应用程序。 如果Content-Type匹配text/*application/json,负载将是一个字符串, 否则负载将是一个字节数组。spring-doc.cadn.net.cn

5.4.1. 有效载荷:

If content type matches text/*application/jsonspring-doc.cadn.net.cn

如果内容类型不匹配 text/*application/jsonspring-doc.cadn.net.cn

5.4.2. 选项

The http source 支持以下配置属性:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

http.cors
allow-credentials

是否应包含与被注解请求的域关联的任何cookie。 (布尔值,默认: <none>)spring-doc.cadn.net.cn

allowed-headers

实际请求中可以使用的请求头列表。 (String[], 默认: <none>)spring-doc.cadn.net.cn

allowed-origins

允许的 origins 列表,例如 https://domain1.com。 (String[], 默认值: <none>)spring-doc.cadn.net.cn

HTTP
mapped-request-headers

要映射的标头。 (String[], 默认值: <none>)spring-doc.cadn.net.cn

path-pattern

HTTP端点路径映射。 (String, 默认值: /)spring-doc.cadn.net.cn

服务器
端口

服务器HTTP端口。 (整数, 默认值: 8080)spring-doc.cadn.net.cn

5.5. JDBC 源

此源从关系型数据库管理系统(RDBMS)中轮询数据。 此源完全基于 DataSourceAutoConfiguration,有关更多信息,请参阅 Spring Boot JDBC 支持spring-doc.cadn.net.cn

5.5.1. 有效载荷

  • Map<String, Object>jdbc.split == true(默认)且 List<Map<String, Object>> 其他情况时spring-doc.cadn.net.cn

5.5.2. 选项

数据源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jdbc.supplier
max-rows

Max numbers of rows to process for query。 (Integer, 默认值: 0)spring-doc.cadn.net.cn

查询

用于选择数据的查询。 (String,默认值: <none>)spring-doc.cadn.net.cn

分割

是否将SQL查询结果拆分为单独的消息。 (布尔值, 默认: true)spring-doc.cadn.net.cn

更新

用于将轮询的消息标记为“已查看”的SQL更新语句。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource
driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.integration.poller
定时任务

Cron 表达式用于轮询。与 'fixedDelay' 和 'fixedRate' 相互排斥。(字符串,默认:<none>)spring-doc.cadn.net.cn

fixed-delay

轮询延迟周期。与 'cron' 和 'fixedRate' 相互排斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

fixed-rate

轮询速率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

initial-delay

轮询初始延迟。对 'fixedDelay' 和 'fixedRate' 生效;对于 'cron' 被忽略。(持续时间,默认:<none>)spring-doc.cadn.net.cn

max-messages-per-poll

轮询周期内每次轮询要获取的最大消息数量。(整数,默认:<none>)spring-doc.cadn.net.cn

receive-timeout

轮询时等待消息的时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于额外的 DataSource 属性和 TriggerProperties 以及 MaxMessagesProperties 的轮询选项。spring-doc.cadn.net.cn

5.6. JMS 源

JMS 源可接收来自 JMS 的消息。spring-doc.cadn.net.cn

5.6.1. 选项

JMS 的源有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jms.supplier
client-id

用于持久订阅的客户端 ID。(字符串,默认:<none>)spring-doc.cadn.net.cn

目的地

要从中接收消息的目标(队列或主题)。(字符串,默认:<none>)spring-doc.cadn.net.cn

message-selector

消息的选择器。(字符串,默认:<none>)spring-doc.cadn.net.cn

session-transacted

启用事务并选择 DefaultMessageListenerContainer,不启用则选择 SimpleMessageListenerContainer。 (布尔类型,默认:true)spring-doc.cadn.net.cn

subscription-durable

持久订阅为真。 (布尔类型,默认:<none>)spring-doc.cadn.net.cn

subscription-name

持久或共享订阅的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

subscription-shared

共享订阅为真。(布尔值,默认:<none>spring-doc.cadn.net.cn

spring.jms
jndi-name

连接工厂JNDI名称。设置后,优先于其他连接工厂自动配置。(字符串,默认:<none>)spring-doc.cadn.net.cn

pub-sub-domain

默认的目标类型是否为主题。(布尔值,默认:false)spring-doc.cadn.net.cn

spring.jms.listener
acknowledge-mode

容器的确认模式。默认情况下,监听器是事务性的,并使用自动确认。(AcknowledgeMode,默认值:<none>,可选值:AUTOCLIENTDUPS_OK)spring-doc.cadn.net.cn

auto-startup

启动时自动启动容器。(布尔类型,默认值:true)spring-doc.cadn.net.cn

并发

最小并发消费者数量。当未指定最大并发数时,将使用最小值作为最大值。(整型,默认:<none>)spring-doc.cadn.net.cn

max-concurrency

最大并发消费者数。(整型,默认:<none>)spring-doc.cadn.net.cn

receive-timeout

接收调用时使用的超时时间。使用-1表示无等待接收,0表示不设置超时。如果不在事务管理器内运行,则仅可能设置为0,并且通常不建议这样做,因为它会阻止干净关闭。(持续时间,默认:1s)spring-doc.cadn.net.cn

5.7. Apache Kafka Source

此模块从Apache Kafka消费消息。spring-doc.cadn.net.cn

5.7.1. 选项

The kafka source has the following options:spring-doc.cadn.net.cn

(参见 Spring for Apache Kafka 的 Spring Boot 配置属性文档)spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

kafka.supplier
ack-discarded

是否在 'RecordFilterStrategy' 后确认被丢弃的记录。(布尔值,默认:false)spring-doc.cadn.net.cn

record-filter

用于 'RecordFilterStrategy' 的 SpEL 表达式,以 'ConsumerRecord' 作为根对象。(表达式,默认:<none>)spring-doc.cadn.net.cn

topic-pattern

Apache Kafka 主题模式以供订阅。(模式,默认:<none>)spring-doc.cadn.net.cn

主题

要订阅的 Apache Kafka 主题。(String[],默认值:<none>)spring-doc.cadn.net.cn

spring.kafka
bootstrap-servers

用于建立与Kafka集群初始连接的主机:端口对的逗号分隔列表。除非被覆盖,否则适用于所有组件。(List<String>, 默认:<none>)spring-doc.cadn.net.cn

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)spring-doc.cadn.net.cn

属性

用于配置客户端的附加属性,适用于生产者和消费者。(Map<String, String>, 默认值: <none>)spring-doc.cadn.net.cn

spring.kafka.consumer
auto-commit-interval

如果将 'enable.auto.commit' 设置为 true,则消费者偏移量自动提交到 Kafka 的频率。(持续时间,默认:<none>)spring-doc.cadn.net.cn

auto-offset-reset

当 Kafka 中没有初始偏移量或当前偏移量在服务器上不再存在时该怎么办。(字符串,默认:<none>)spring-doc.cadn.net.cn

bootstrap-servers

用于建立与 Kafka 集群的初始连接的主机:端口对的逗号分隔列表。覆盖消费者的全局属性。(List<String>, 默认值: <none>)spring-doc.cadn.net.cn

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)spring-doc.cadn.net.cn

enable-auto-commit

消费者偏移量是否周期性地在后台提交。(布尔型,默认:<none>)spring-doc.cadn.net.cn

fetch-max-wait

服务器在回答获取请求之前阻塞的最大时间,如果没有足够的数据立即满足由"fetch-min-size"指定的要求。(持续时间,默认:<none>)spring-doc.cadn.net.cn

fetch-min-size

服务器应为获取请求返回的数据最小量。(DataSize,默认值:<none>)spring-doc.cadn.net.cn

group-id

唯一字符串,用于标识消费者所属的消费者组。(字符串,默认:<none>)spring-doc.cadn.net.cn

heartbeat-interval

消费者协调器的心跳间隔预期时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

isolation-level

读取已事务性写入的消息时的隔离级别。(IsolationLevel,默认:read-uncommitted,可选值:READ_UNCOMMITTEDREAD_COMMITTED)spring-doc.cadn.net.cn

key-deserializer

用于键的反序列化程序类。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

max-poll-records

单次调用 poll() 方法时返回的最大记录数。(Integer,默认值:<none>)spring-doc.cadn.net.cn

属性

用于配置客户端的额外消费者特定属性。(Map<String, String>, 默认值: <none>)spring-doc.cadn.net.cn

value-deserializer

值的反序列化器类。(Class<?>, 默认: <none>)spring-doc.cadn.net.cn

spring.kafka.listener
ack-count

当 ackMode 为 "COUNT" 或 "COUNT_TIME" 时,偏移量提交之间的记录数量。(整数,默认:<none>)spring-doc.cadn.net.cn

ack-mode

监听器 AckMode。请参阅 spring-kafka 文档。(AckMode,默认值:<none>,有效值:RECORDBATCHTIMECOUNTCOUNT_TIMEMANUALMANUAL_IMMEDIATE)spring-doc.cadn.net.cn

ack-time

当 ackMode 为 "TIME" 或 "COUNT_TIME" 时,偏移量提交之间的间隔时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

async-acks

支持异步记录确认。仅当spring.kafka.listener.ack-mode为manual或manual-immediate时适用。(布尔值,默认:<none>)spring-doc.cadn.net.cn

auto-startup

容器是否自动启动。(布尔类型,默认:true)spring-doc.cadn.net.cn

client-id

监听器的消费者 client.id 属性前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

并发

监听器容器中运行的线程数。(Integer,默认值:<none>)spring-doc.cadn.net.cn

idle-between-polls

消费者调用 Consumer.poll(Duration) 方法之间的睡眠间隔。(持续时间,默认:0)spring-doc.cadn.net.cn

idle-event-interval

发布空闲消费者事件的时间间隔(未收到数据)。(持续时间,默认:<none>)spring-doc.cadn.net.cn

idle-partition-event-interval

发布空分区消费者事件的时间间隔(分区没有收到数据时)。(持续时间,默认:<none>)spring-doc.cadn.net.cn

immediate-stop

容器是在处理完当前记录后停止,还是在处理完之前轮询的所有记录后才停止。(布尔值,默认:false)spring-doc.cadn.net.cn

log-container-config

初始化期间是否记录容器配置(INFO级别)(布尔值,默认:<none>spring-doc.cadn.net.cn

missing-topics-fatal

如果配置的主题至少有一个在代理上不存在,容器是否应该启动失败。(布尔值,默认:falsespring-doc.cadn.net.cn

monitor-interval

消费者无响应时检查的时间间隔。如果未指定持续时间后缀,则使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

no-poll-threshold

应用于“pollTimeout”以确定消费者是否无响应的乘数。(浮点型,默认:<none>spring-doc.cadn.net.cn

poll-timeout

轮询消费者时使用的超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

类型

监听器类型。(类型,默认:single)spring-doc.cadn.net.cn

5.8. 加载生成器源码

一个产生生成数据并将其发送到流的源。spring-doc.cadn.net.cn

5.8.1. 选项

The load-generator 源代码具有以下选项:spring-doc.cadn.net.cn

load-generator.generate-timestamp

是否生成时间戳。 (布尔值, 默认: false)spring-doc.cadn.net.cn

load-generator.message-count

消息数量. (Integer, 默认值: 1000)spring-doc.cadn.net.cn

load-generator.message-size

消息大小。 (整数, 默认值: 1000)spring-doc.cadn.net.cn

load-generator.producers

生产者数量。 (整数, 默认值: 1)spring-doc.cadn.net.cn

5.9. 邮件发送源

一个监听邮件并将其消息正文作为消息负载发出的应用程序源代码。spring-doc.cadn.net.cn

5.9.1. 选项

邮件 有以下选项:spring-doc.cadn.net.cn

mail.supplier.charset

字节数组邮件到字符串转换的字符集。 (字符串,默认:UTF-8)spring-doc.cadn.net.cn

mail.supplier.delete

设置为 true 表示下载邮件后将其删除。(布尔类型,默认:false)spring-doc.cadn.net.cn

mail.supplier.expression

配置一个 SpEL 表达式来选择消息。(字符串,默认:true)spring-doc.cadn.net.cn

mail.supplier.idle-imap

设置为 true 以使用 IdleImap 配置。(布尔类型,默认:false)spring-doc.cadn.net.cn

mail.supplier.java-mail-properties

JavaMail 属性作为由换行符分隔的 name-value 对字符串,例如 'foo=bar baz=car'。 (Properties, 默认: <none>)spring-doc.cadn.net.cn

mail.supplier.mark-as-read

设置为 true 可标记邮件为已读。(布尔类型,默认:false)spring-doc.cadn.net.cn

mail.supplier.url

邮件服务器连接的URL,例如'imaps://用户名:[[电子邮件受保护]:993/收件箱'。(URLName,默认:<none>)spring-doc.cadn.net.cn

mail.supplier.user-flag

当服务器不支持 ecent时标记消息的标志。(字符串,默认:<none>)spring-doc.cadn.net.cn

5.10. MongoDB 数据源

此源从MongoDB获取数据。 此源完全基于MongoDataAutoConfiguration,请参阅 Spring Boot MongoDB支持 以获取更多信息。spring-doc.cadn.net.cn

5.10.1. 选项

The mongodb source 有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mongodb.supplier
集合

要查询的MongoDB集合。(字符串,默认:<none>)spring-doc.cadn.net.cn

查询

MongoDB 查询。 (字符串,默认:{ })spring-doc.cadn.net.cn

query-expression

MongoDB 查询 DSL 样式的 SpEL 表达式。(表达式,默认:<none>spring-doc.cadn.net.cn

分割

是否将查询结果拆分为单个消息。(布尔类型,默认:true)spring-doc.cadn.net.cn

update-expression

MongoDB 更新 DSL 样式的 SpEL 表达式。(表达式,默认:<none>spring-doc.cadn.net.cn

spring.data.mongodb
additional-hosts

附加服务器主机。不能与 URI 设置同时使用,也不能在未指定 'host' 的情况下设置。附加主机将使用默认的 MongoDB 端口 27017。如果要使用其他端口,可以使用 "host:port" 语法。(List<String>, 默认值: <none>)spring-doc.cadn.net.cn

authentication-database

认证数据库名称。(字符串,默认:<none>spring-doc.cadn.net.cn

auto-index-creation

是否启用自动索引创建。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名。覆盖URI中的数据库。(字符串,默认:<none>spring-doc.cadn.net.cn

field-naming-strategy

要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

主机

Mongo服务器主机。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:<none>)spring-doc.cadn.net.cn

端口

Mongo服务器端口。不能与URI一起设置。(Integer,默认:<none>spring-doc.cadn.net.cn

replica-set-name

群集所需的副本集名称。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uri

Mongo 数据库连接字符串。此选项会覆盖主机、端口、用户名和密码。(String,默认值:mongodb://localhost/testspring-doc.cadn.net.cn

用户名

登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-representation

转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIEDSTANDARDC_SHARP_LEGACYJAVA_LEGACYPYTHON_LEGACY)spring-doc.cadn.net.cn

spring.data.mongodb.gridfs

GridFS存储桶名称。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

数据库

GridFS 数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用

是否启用SSL支持。如果提供了"bundle",则会自动启用,除非另外指定。(布尔值,默认:<none>)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 以了解额外的 MongoProperties 属性。 查看并 TriggerProperties 了解轮询选项。spring-doc.cadn.net.cn

5.11. MQTT 源

源代码可用于接收来自 MQTT 的消息。spring-doc.cadn.net.cn

5.11.1. Payload:

5.11.2. 选项

MQTT 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mqtt
clean-session

客户端和服务器是否应记住重启和重新连接之间的状态。(布尔值,默认:true)spring-doc.cadn.net.cn

connection-timeout

连接超时时间(秒)。 (Integer, 默认值:30)spring-doc.cadn.net.cn

keep-alive-interval

ping间隔,单位为秒。(整数,默认值:60)spring-doc.cadn.net.cn

密码

连接到代理时要使用的密码。(字符串,默认:guest)spring-doc.cadn.net.cn

持久化

'memory' 或 'file'。 (字符串,默认:memory)spring-doc.cadn.net.cn

persistence-directory

持久化目录。 (字符串,默认:/tmp/pahospring-doc.cadn.net.cn

ssl-properties

MQTT 客户端 SSL 属性。(Map<String, String>,默认:<none>spring-doc.cadn.net.cn

url

MQTT代理的位置(以逗号分隔的列表)。(字符串数组,默认:[tcp://localhost:1883])spring-doc.cadn.net.cn

用户名

连接到代理时使用的用户名。(字符串,默认:guest)spring-doc.cadn.net.cn

mqtt.supplier
二进制

将负载保留为字节形式。 (布尔值,默认:false)spring-doc.cadn.net.cn

编码

用于将字节转换为字符串的字符集(当二进制为假时)。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

client-id

标识客户端。(字符串,默认:stream.client.id.source)spring-doc.cadn.net.cn

服务质量

qos;适用于所有主题的单一值或逗号分隔的列表以匹配主题。(Integer[],默认:[0])spring-doc.cadn.net.cn

主题

源将要订阅的主题(用逗号分隔)。(String[],默认:[stream.mqtt])spring-doc.cadn.net.cn

5.12. RabbitMQ 源

The "rabbit" 源启用接收来自RabbitMQ的消息。spring-doc.cadn.net.cn

The queue(s) 必须在流部署之前存在;它们不会自动创建。 您可以轻松地使用 RabbitMQ 网页界面创建一个队列。spring-doc.cadn.net.cn

5.12.1. 输入

5.12.2. 输出

负载

5.12.3. 选项

Rabbit 源代码有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

rabbit.supplier
enable-retry

启用重试的布尔值。(Boolean,默认值:false)spring-doc.cadn.net.cn

initial-retry-interval

启用重试时的初始重试间隔。(整数,默认值:1000)spring-doc.cadn.net.cn

mapped-request-headers

要映射的标头。 (String[], 默认值: [STANDARD_REQUEST_HEADERS])spring-doc.cadn.net.cn

max-attempts

重试启用时的最大交付尝试次数。(整数,默认:3)spring-doc.cadn.net.cn

max-retry-interval

启用重试时的最大重试间隔。(整数,默认:30000)spring-doc.cadn.net.cn

own-connection

为 true 时,使用基于引导属性的单独连接。(布尔类型,默认值:false)spring-doc.cadn.net.cn

队列

源将监听消息的队列。(字符串数组,默认:<none>)spring-doc.cadn.net.cn

重新排队

拒绝的消息是否应重新排队。(布尔值,默认:true)spring-doc.cadn.net.cn

retry-multiplier

启用重试时的重试退避倍数。(Double,默认:2)spring-doc.cadn.net.cn

事务性

渠道是否已交易。(布尔型,默认:false)spring-doc.cadn.net.cn

spring.rabbitmq
address-shuffle-mode

用于打乱配置地址的模式。(AddressShuffleMode,默认值:none,可选值:NONERANDOMINORDER)spring-doc.cadn.net.cn

地址

客户端应连接的地址列表,用逗号分隔。设置后,主机和端口将被忽略。(字符串,默认:<none>)spring-doc.cadn.net.cn

channel-rpc-timeout

通道中RPC调用的继续超时时间。将其设置为零以无限期等待。(持续时间,默认:10m)spring-doc.cadn.net.cn

connection-timeout

连接超时时间。将其设置为零以无限期等待。(持续时间,默认:<none>)spring-doc.cadn.net.cn

主机

RabbitMQ主机。如果设置了地址,则忽略此设置。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

登录以向代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

端口

RabbitMQ 端口。如果设置了地址,则忽略此设置。默认为 5672,如果启用了 SSL,则默认为 5671。(Integer,默认:<none>)spring-doc.cadn.net.cn

publisher-confirm-type

发布者类型确认要使用的。(ConfirmType,默认值:<none>,可能的取值:SIMPLECORRELATEDNONE)spring-doc.cadn.net.cn

publisher-returns

是否启用发布商返还。(布尔值,默认:falsespring-doc.cadn.net.cn

requested-channel-max

客户端请求的每个连接的通道数。使用0表示无限制。(整数,默认:2047)spring-doc.cadn.net.cn

requested-heartbeat

请求的心跳超时时间;零表示无。如果未指定持续时间后缀,则使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户以对经纪人进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

virtual-host

连接到代理时使用的虚拟主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.rabbitmq.listener.simple
acknowledge-mode

容器确认模式。(AcknowledgeMode,默认值:<none>,可选值:NONEMANUALAUTO)spring-doc.cadn.net.cn

auto-startup

是否在启动时自动启动容器。(布尔类型,默认:true)spring-doc.cadn.net.cn

batch-size

批处理大小,以物理消息的数量表示,将由容器使用。(整数,默认:<none>)spring-doc.cadn.net.cn

并发

监听器调用线程的最小数量。(整数,默认:<none>)spring-doc.cadn.net.cn

consumer-batch-enabled

容器是否根据'receive-timeout'和'batch-size'创建一批消息。将'de-batching-enabled'强制转换为true,以便在批处理中包含生产者创建的批次的内容作为离散记录。(布尔值,默认:false)spring-doc.cadn.net.cn

de-batching-enabled

容器是否应该将批处理消息作为离散消息呈现,还是调用监听器并传入整个批次。(布尔类型,默认:true)spring-doc.cadn.net.cn

default-requeue-rejected

被拒收的交货是否默认重新排队。(布尔值,默认:<none>spring-doc.cadn.net.cn

idle-event-interval

空闲容器事件应多久发布一次。(持续时间,默认:<none>spring-doc.cadn.net.cn

max-concurrency

监听器调用线程的最大数量。(整数,默认:<none>)spring-doc.cadn.net.cn

missing-queues-fatal

容器声明的队列在代理上不可用时是否失败,以及运行时如果一个或多个队列被删除是否停止容器。(布尔值,默认:true)spring-doc.cadn.net.cn

预取

每个消费者可以挂起未确认消息的最大数量。(整数,默认:<none>)spring-doc.cadn.net.cn

spring.rabbitmq.listener
类型

监听器容器类型。(ContainerType,默认:simple,可选值:SIMPLEDIRECTSTREAM)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。spring-doc.cadn.net.cn

一个关于重试的注意事项
在默认的 ackModeAUTO)和 requeuetrue)选项下,失败的消息投递将无限次重试。由于Rabbit源本身处理的内容不多,因此源本身出现故障的风险较小,除非下游的Binder因某种原因未连接。将 重试 设置为 false 将导致消息在首次尝试时被拒绝(如果代理配置了死信交换/队列,则可能发送到该位置)。enableRetry 选项允许配置重试参数,以便在消息传递失败时进行重试,并在重试次数用尽后最终丢弃(或死信)。在重试间隔期间,交付线程被挂起。重试选项包括enableRetrymaxAttemptsinitialRetryIntervalretryMultipliermaxRetryInterval。使用 MessageConversionException 导致的消息传递失败永远不会重试;假设如果消息在第一次尝试时无法转换,后续尝试也会失败。此类消息将被丢弃(或转入死信队列)。

5.12.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.12.5. 示例

java -jar rabbit-source.jar --rabbit.queues=

5.13. 亚马逊S3源

此源应用程序支持使用 Amazon S3 协议传输文件。 文件从 remote 目录(S3 存储桶)传输到部署应用程序的 local 目录。spring-doc.cadn.net.cn

源发出的消息默认以字节数组提供。但是,可以使用--mode选项进行自定义:spring-doc.cadn.net.cn

使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarkerwithMarkers 的默认值是 false,除非明确设置。spring-doc.cadn.net.cn

另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
spring-doc.cadn.net.cn

5.13.1. mode = lines

标题:
负载:

每行一个Stringspring-doc.cadn.net.cn

第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json属性确定。spring-doc.cadn.net.cn

5.13.2. 模式 = ref

标题:
负载:

一个 java.io.File 对象。spring-doc.cadn.net.cn

5.13.3. 选项

代码s3-source有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指定要配置的元数据存储类型(默认为“内存”)。若要使用持久化存储,必须包含相应的 Spring Integration 依赖。(StoreType,默认:<none>,可能值:mongodbredisdynamodbjdbczookeeperhazelcastmemory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

s3.supplier
auto-create-local-dir

是否创建本地目录。(布尔类型,默认:true)spring-doc.cadn.net.cn

delete-remote-files

处理完成后是否删除远程文件。(布尔值,默认:false)spring-doc.cadn.net.cn

filename-pattern

用于过滤远程文件的模式。(字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于筛选远程文件的正则表达式。(模式,默认:<none>)spring-doc.cadn.net.cn

list-only

设置为 true 可以返回 s3 对象元数据而不需将文件复制到本地目录。(Boolean,默认:false)spring-doc.cadn.net.cn

local-dir

本地目录,用于存储文件。(File,默认:<none>)spring-doc.cadn.net.cn

preserve-timestamp

是否将远程文件的时间戳传递给本地文件。(布尔类型,默认:true)spring-doc.cadn.net.cn

remote-dir

AWS S3存储桶资源。(字符串,默认:bucketspring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。 (字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

临时文件后缀。(字符串,默认:.tmpspring-doc.cadn.net.cn

spring.cloud.aws.credentials
access-key

与静态提供程序一起使用的访问密钥。(字符串,默认值:<none>)spring-doc.cadn.net.cn

instance-profile

使用默认配置来配置实例配置文件凭据提供程序。(布尔类型,默认:false)spring-doc.cadn.net.cn

个人资料

AWS 配置文件。 (配置文件,缺省:<none>)spring-doc.cadn.net.cn

secret-key

使用静态提供程序的密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.aws.region
instance-profile

使用默认配置设置实例配置文件区域提供程序。(布尔值,默认:false)spring-doc.cadn.net.cn

个人资料

AWS 配置文件。 (配置文件,缺省:<none>)spring-doc.cadn.net.cn

静态

<文档缺失> (字符串,默认:<none>spring-doc.cadn.net.cn

spring.cloud.aws.s3
accelerate-mode-enabled

访问 S3 时启用加速端点的选项。加速端点通过使用 Amazon CloudFront 的全球分布边缘位置,允许更快地传输对象。(布尔值,默认:<none>)spring-doc.cadn.net.cn

checksum-validation-enabled

选项:禁用对存储在S3中的对象校验和进行验证。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

chunked-encoding-enabled

用于在对 {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} 和 {@link software.amazon.awssdk.services.s3.model.UploadPartRequest} 签名请求负载时启用分块编码的选项。(布尔值,默认:<none>)spring-doc.cadn.net.cn

cross-region-enabled

启用跨区域存储桶访问。(布尔类型,默认为<none>spring-doc.cadn.net.cn

端点

覆盖默认端点。(URI,默认:<none>)spring-doc.cadn.net.cn

path-style-access-enabled

启用使用路径样式访问方式来访问S3对象,而不是DNS样式访问。推荐使用DNS样式访问,因为它在访问S3时会实现更好的负载均衡。(布尔值,默认:<none>)spring-doc.cadn.net.cn

区域

覆盖默认区域。 (字符串,默认:<none>)spring-doc.cadn.net.cn

use-arn-region-enabled

如果在作为具有与客户端配置区域不同的区域的S3操作目标时传递了S3资源ARN,则必须将此标志设置为'true',以允许客户端向ARN中指定的区域进行跨区域调用,否则将抛出异常。(布尔型,默认:<none>)spring-doc.cadn.net.cn

5.13.4. Amazon AWS 常用选项

Amazon S3 源(与其他所有 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,并且其自动配置类由 Spring Boot 自动使用。有关所需和有用的自动配置属性,请参阅他们的文档。spring-doc.cadn.net.cn

5.13.5. 示例

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP 源

此源应用程序支持使用SFTP协议传输文件。文件从remote目录传输到部署应用的local目录。默认情况下,源发出的消息为字节数组。但是,可以使用--mode选项进行自定义:spring-doc.cadn.net.cn

使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarkerwithMarkers 的默认值是 false,除非明确设置。spring-doc.cadn.net.cn

有关高级配置选项,请参阅sftp-supplierspring-doc.cadn.net.cn

另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
spring-doc.cadn.net.cn

5.14.1. 输入

N/A(从SFTP服务器获取文件)。spring-doc.cadn.net.cn

5.14.2. 输出

模式 = 内容
标题:
负载:

用文件内容填充的 byte[]spring-doc.cadn.net.cn

模式 = 行
标题:
负载:

每行一个Stringspring-doc.cadn.net.cn

第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json属性确定。spring-doc.cadn.net.cn

模式 = 参考
标题:
负载:

一个 java.io.File 对象。spring-doc.cadn.net.cn

5.14.3. 选项

ftp源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指定要配置的元数据存储类型(默认为“内存”)。若要使用持久化存储,必须包含相应的 Spring Integration 依赖。(StoreType,默认:<none>,可能值:mongodbredisdynamodbjdbczookeeperhazelcastmemory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

sftp.supplier
auto-create-local-dir

如果不存在,则设置为 true 以创建本地目录。(Boolean,默认:true)spring-doc.cadn.net.cn

delay-when-empty

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

delete-remote-files

设置为true表示在成功传输后删除远程文件。(布尔类型,默认:false)spring-doc.cadn.net.cn

目录

工厂“name.directory”对的列表。(String[],默认:<none>)spring-doc.cadn.net.cn

工厂

一个工厂名称到工厂的映射。(Map<String, Factory>, 默认值: <none>)spring-doc.cadn.net.cn

公平

多个服务器/目录公平轮询。默认为false,因此如果源有多个条目,则在访问其他源之前会先接收这些条目。(布尔型,默认:false)spring-doc.cadn.net.cn

filename-pattern

一个过滤器模式,用于匹配要传输的文件名。(字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于匹配要传输的文件名的过滤正则表达式模式。(Pattern,默认值:<none>)spring-doc.cadn.net.cn

list-only

设置为 true 可以在不包含整个有效负载的情况下返回文件元数据。(布尔值,默认:false)spring-doc.cadn.net.cn

local-dir

用于文件传输的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

max-fetch

每次轮询要获取的远程文件的最大数量;默认为无限制。在列出文件或构建任务启动请求时无效。(整数,默认:<none>)spring-doc.cadn.net.cn

preserve-timestamp

设置为 true 可保留原始时间戳。(布尔类型,默认值:true)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

rename-remote-files-to

一个 SpEL 表达式,解析为新名称,在远程文件成功传输后必须重命名。(表达式,默认:<none>)spring-doc.cadn.net.cn

设置为 true 可以流式传输文件,而不是复制到本地目录。(Boolean,默认:false)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

sftp.supplier.factory
allow-unknown-keys

允许未知或已更改的密钥。 (Boolean,默认:false)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

known-hosts-expression

一个解析为已知主机文件位置的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

pass-phrase

用户私钥的密码。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:22)spring-doc.cadn.net.cn

private-key

用户私钥的位置。 (资源,默认:<none>)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

sftp.supplier.sort-by
属性

文件列表条目排序的属性(FILENAME、ATIME:上次访问时间、MTIME:上次修改时间)。(属性,默认:<none>spring-doc.cadn.net.cn

目录

排序方向(升序或降序)。 (Dir,默认:<none>spring-doc.cadn.net.cn

5.14.4. 示例

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. 系统日志

syslog源通过UDP、TCP或两者同时接收SYSLOG数据包。支持RFC3164(BSD)和RFC5424格式。spring-doc.cadn.net.cn

5.15.1. 选项

syslog.supplier.buffer-size

解码消息时使用的缓冲区大小;较大的消息将被拒绝。(Integer,默认:2048)spring-doc.cadn.net.cn

syslog.supplier.nio

是否使用NIO(在支持大量连接时)。(布尔值,默认:falsespring-doc.cadn.net.cn

syslog.supplier.port

要监听的端口。(整数,默认:1514)spring-doc.cadn.net.cn

syslog.supplier.protocol

用于 SYSLOG 的协议(tcp 或 udp)。(协议,缺省:<none>,有效值:tcpudpboth)spring-doc.cadn.net.cn

syslog.supplier.reverse-lookup

是否对传入的套接字执行反向查找。(布尔值,默认:false)spring-doc.cadn.net.cn

syslog.supplier.rfc

"5424" 或 "3164" - 根据 RFC 的 syslog 格式;3164 又称作 "BSD" 格式。 (字符串,默认值:3164)spring-doc.cadn.net.cn

syslog.supplier.socket-timeout

套接字超时时间。(整型,默认:0)spring-doc.cadn.net.cn

5.16. TCP

源代码tcp用作服务器,允许远程方连接到它并通过原始tcp套接字提交数据。spring-doc.cadn.net.cn

TCP是一种流式协议,需要某种机制来在传输线上对消息进行定界。提供了多种解码器,默认的是'CRLF',与Telnet兼容。spring-doc.cadn.net.cn

由TCP源应用程序生成的消息的byte[]有效负载。spring-doc.cadn.net.cn

5.16.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

TCP
中文

是否使用 NIO。(布尔值,默认:falsespring-doc.cadn.net.cn

端口

要监听的端口号;0 表示操作系统选择一个端口。(整数,默认:1234)spring-doc.cadn.net.cn

reverse-lookup

对远程IP地址执行反向DNS查找;如果为false,则仅在消息头中包含IP地址。(布尔值,默认:falsespring-doc.cadn.net.cn

socket-timeout

套接字在未收到数据前超时关闭的时间(毫秒)。(Integer,缺省:120000)spring-doc.cadn.net.cn

use-direct-buffers

是否使用直接缓冲区。(布尔值,默认:false)spring-doc.cadn.net.cn

tcp.supplier
buffer-size

解码消息时使用的缓冲区大小;较大的消息将被拒绝。(Integer,默认:2048)spring-doc.cadn.net.cn

解码器

接收消息时使用的解码器。(编码,默认:<none>,可选值:CRLFLFNULLSTXETXRAWL1L2L4)spring-doc.cadn.net.cn

5.16.2. 可用解码器

文本数据
CRLF(默认)

文本由回车符(0x0d)后跟换行符(0x0a)终止spring-doc.cadn.net.cn

LF

以换行符终止的文本(0x0a)spring-doc.cadn.net.cn

null

以空字节(0x00)终止的文本spring-doc.cadn.net.cn

STXETX

以 STX (0x02) 开头并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据
原始内容

无结构 - 客户端通过关闭套接字来表示一条完整的消息spring-doc.cadn.net.cn

L1

以一个字节(无符号)长度字段开头的数据(支持最多255字节)spring-doc.cadn.net.cn

L2

以两个字节(无符号)长度字段开头的数据(最多为216-1个字节)spring-doc.cadn.net.cn

L4

以四个字节(有符号)长度字段开头的数据(最多可达231-1字节)spring-doc.cadn.net.cn

5.17. 时间源

时间源将每隔一段时间简单地发出包含当前时间的字符串。spring-doc.cadn.net.cn

5.17.1. 可选项

时间源具有以下选项:timespring-doc.cadn.net.cn

spring.integration.poller.cron

Cron 表达式用于轮询。与 'fixedDelay' 和 'fixedRate' 相互排斥。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.integration.poller.fixed-delay

轮询延迟周期。与 'cron' 和 'fixedRate' 相互排斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.integration.poller.fixed-rate

轮询速率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.integration.poller.initial-delay

轮询初始延迟。对 'fixedDelay' 和 'fixedRate' 生效;对于 'cron' 被忽略。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.integration.poller.max-messages-per-poll

轮询周期内每次轮询要获取的最大消息数量。(整数,默认:<none>)spring-doc.cadn.net.cn

spring.integration.poller.receive-timeout

轮询时等待消息的时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

time.date-format

日期值的格式。(字符串,默认:MM/dd/yy HH:mm:ss)spring-doc.cadn.net.cn

5.18. Twitter 消息源

重复获取最近30天内的直接消息(包括已发送和接收的消息),并按时间倒序排列。
缓解后的消息被缓存(在MetadataStore缓存中)以防止重复。
默认使用内存中的SimpleMetadataStorespring-doc.cadn.net.cn

twitter.message.source.count控制返回消息的数量。spring-doc.cadn.net.cn

属性spring.cloud.stream.poller控制 the message poll interval。
必须与使用的API速率限制一致
spring-doc.cadn.net.cn

5.18.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

spring.integration.poller
定时任务

Cron 表达式用于轮询。与 'fixedDelay' 和 'fixedRate' 相互排斥。(字符串,默认:<none>)spring-doc.cadn.net.cn

fixed-delay

轮询延迟周期。与 'cron' 和 'fixedRate' 相互排斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

fixed-rate

轮询速率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

initial-delay

轮询初始延迟。对 'fixedDelay' 和 'fixedRate' 生效;对于 'cron' 被忽略。(持续时间,默认:<none>)spring-doc.cadn.net.cn

max-messages-per-poll

轮询周期内每次轮询要获取的最大消息数量。(整数,默认:<none>)spring-doc.cadn.net.cn

receive-timeout

轮询时等待消息的时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

twitter.connection
access-token

您的 Twitter Tokens。 (字符串,默认值:<none>spring-doc.cadn.net.cn

access-token-secret

您的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

consumer-key

您的 Twitter 密钥。(字符串,默认:<none>spring-doc.cadn.net.cn

consumer-secret

您的 Twitter 密钥。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

debug-enabled

启用 Twitter4J 调试模式。(布尔值,默认:falsespring-doc.cadn.net.cn

raw-json

启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:truespring-doc.cadn.net.cn

twitter.message.source
计数

要返回的事件最大数量。默认为20,最多50。(整数,默认:20)spring-doc.cadn.net.cn

5.19. Twitter搜索源

Twitter 的 标准搜索 API(search/tweets)允许对最近或热门推文的索引进行简单查询。这个 Source 提供了针对过去 7 天内发布的新近推文抽样的连续搜索。是 'public' API 集合的一部分。spring-doc.cadn.net.cn

返回与指定查询匹配的相关推文集合。spring-doc.cadn.net.cn

使用spring.cloud.stream.poller属性来控制连续搜索请求之间的间隔。速率限制-每30分钟窗口内180个请求(例如,约6次/分钟,约1次/10秒)spring-doc.cadn.net.cn

twitter.search查询属性允许通过关键字进行查询,并根据时间和地理位置过滤结果。spring-doc.cadn.net.cn

代码twitter.search.counttwitter.search.page控制搜索API的结果分页。spring-doc.cadn.net.cn

注意:Twitter 的搜索服务以及由此扩展的搜索 API 并不打算作为推文的全面来源。并非所有的推文都会被索引或通过搜索界面提供。spring-doc.cadn.net.cn

5.19.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

spring.integration.poller
定时任务

Cron 表达式用于轮询。与 'fixedDelay' 和 'fixedRate' 相互排斥。(字符串,默认:<none>)spring-doc.cadn.net.cn

fixed-delay

轮询延迟周期。与 'cron' 和 'fixedRate' 相互排斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

fixed-rate

轮询速率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

initial-delay

轮询初始延迟。对 'fixedDelay' 和 'fixedRate' 生效;对于 'cron' 被忽略。(持续时间,默认:<none>)spring-doc.cadn.net.cn

max-messages-per-poll

轮询周期内每次轮询要获取的最大消息数量。(整数,默认:<none>)spring-doc.cadn.net.cn

receive-timeout

轮询时等待消息的时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

twitter.connection
access-token

您的 Twitter Tokens。 (字符串,默认值:<none>spring-doc.cadn.net.cn

access-token-secret

您的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

consumer-key

您的 Twitter 密钥。(字符串,默认:<none>spring-doc.cadn.net.cn

consumer-secret

您的 Twitter 密钥。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

debug-enabled

启用 Twitter4J 调试模式。(布尔值,默认:falsespring-doc.cadn.net.cn

raw-json

启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:truespring-doc.cadn.net.cn

twitter.search
计数

每页返回的推文数量(例如,每次单一请求),最多为100个。(整数,默认:100spring-doc.cadn.net.cn

语言

将搜索到的推文限制为指定语言,由 http://en.wikipedia.org/wiki/ISO_639-1 定义。(字符串,默认:<none>spring-doc.cadn.net.cn

页面

从最近的推文开始搜索之前,要向后搜索的页面数量(例如请求),以在最近年份的推文中再次启动搜索。总共向后搜索的推文数为(页码 * 数量)(整数,默认:3spring-doc.cadn.net.cn

查询

通过搜索查询字符串搜索推文。(字符串,默认:<none>)spring-doc.cadn.net.cn

restart-from-most-recent-on-empty-response

在空响应时,从最近的推文重新开始搜索。仅在第一次重启后应用(例如,当 since_id != UNBOUNDED 时)(布尔值,默认:falsespring-doc.cadn.net.cn

result-type

指定您偏好的搜索结果类型。当前默认值为“混合”。有效值包括:混合:在响应中同时包含热门和实时结果。最近:仅在响应中返回最新的结果。热门:仅在响应中返回最热门的结果。(ResultType,默认:<none>,可能值:popularmixedrecent)spring-doc.cadn.net.cn

Since

如果指定,返回自给定日期以来的推文。日期应按 YYYY-MM-DD 格式设置。(字符串,默认:<none>spring-doc.cadn.net.cn

twitter.search.geocode
纬度

用户的纬度。 (双精度浮点数,默认值:-1)spring-doc.cadn.net.cn

经度

用户的经度。(双精度浮点数,默认:-1spring-doc.cadn.net.cn

半径

以(纬度,经度)点为中心的半径(单位:千米)。(双精度浮点数,默认:-1spring-doc.cadn.net.cn

5.20. Twitter 流源

实时推文流 过滤采样 API 支持。spring-doc.cadn.net.cn

  • The Filter API 返回与一个或多个过滤器谓词匹配的公共状态。 使用多个参数可以使用单个连接到流式API。 提示:trackfollowlocations 字段用运算符组合! 包含 track=foofollow=1234 的查询返回匹配 test由用户 1234 创建的推文。spring-doc.cadn.net.cn

  • Sample API 返回所有公共状态的小随机样本。
    默认访问级别的推文返回的结果相同,因此如果两个不同的客户端连接到此端点,则它们会看到相同的推文。spring-doc.cadn.net.cn

默认访问级别允许最多400个跟踪关键字、5,000个关注用户ID以及25个0.1-360度的位置框。spring-doc.cadn.net.cn

5.20.1. 可选项

按前缀分组的属性:spring-doc.cadn.net.cn

twitter.connection
access-token

您的 Twitter Tokens。 (字符串,默认值:<none>spring-doc.cadn.net.cn

access-token-secret

您的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

consumer-key

您的 Twitter 密钥。(字符串,默认:<none>spring-doc.cadn.net.cn

consumer-secret

您的 Twitter 密钥。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

debug-enabled

启用 Twitter4J 调试模式。(布尔值,默认:falsespring-doc.cadn.net.cn

raw-json

启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:truespring-doc.cadn.net.cn

twitter.stream.filter
计数

指示在转换到实时流之前要流式传输的先前状态的数量。(整数,默认:0spring-doc.cadn.net.cn

filter-level

过滤器级别限制流中显示的推文,仅包括具有最小filterLevel属性值的推文。可以是none、low或medium之一。(FilterLevel,默认:<none>)spring-doc.cadn.net.cn

关注

指定要接收公共推文的用户,按ID。(List<Long>, 默认:<none>)spring-doc.cadn.net.cn

语言

指定流的推文语言。(List<String>, 默认: <none>)spring-doc.cadn.net.cn

位置

要跟踪的位置。内部表示为二维数组。边界框无效:52.38,4.90,51.51,-0.12。第一对必须是框的西南角(List<BoundingBox>,默认:<none>)spring-doc.cadn.net.cn

跟踪

指定要跟踪的关键词。(List<String>, 默认: <none>)spring-doc.cadn.net.cn

twitter.stream
类型

<documentation missing> (数据流类型,默认值:<none>,可选值:samplefilterfirehoselink)spring-doc.cadn.net.cn

5.21. WebSocket 源

通过 WebSocket 生产消息的 Websocket 源。spring-doc.cadn.net.cn

5.21.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

websocket.supplier
allowed-origins

允许的来源。 (字符串,默认:*)spring-doc.cadn.net.cn

路径

服务器WebSocket处理器暴露的路径。(字符串,默认:/websocket)spring-doc.cadn.net.cn

websocket.supplier.sock-js
启用

在服务器上启用 SockJS 服务。默认为 'false'(Boolean,默认:false)spring-doc.cadn.net.cn

5.21.2. 示例

要验证 websocket-source 是否从 WebSocket 客户端接收消息,可以使用以下简单的端到端设置。spring-doc.cadn.net.cn

步骤1:启动kafka
步骤2:将websocket-source部署到特定端口,例如8080
第3步:在端口8080路径“/websocket”上连接一个WebSocket客户端,并发送一些消息。

您可以启动一个kafka控制台消费者并在那里查看消息。spring-doc.cadn.net.cn

5.22. XMPP 源

"xmpp"源可用来从XMPP服务器接收消息。spring-doc.cadn.net.cn

5.22.1. 输入

5.22.2. 输出

负载

5.22.3. 选项

xmpp源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

xmpp.factory
主机

要连接的 XMPP 主机服务器。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

连接用户的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

连接到主机的端口。 - 默认客户端端口:5222 (整数,默认:5222)spring-doc.cadn.net.cn

资源

要绑定到 XMPP 主机上的资源。- 可以为空,如果未设置服务器会自动生成一个 (字符串,默认为 <none>)spring-doc.cadn.net.cn

security-mode

<文档缺失> (安全模式,默认值:<none>,可选值:requiredifpossibledisabled)spring-doc.cadn.net.cn

service-name

为 XMPP 域设置的服务名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

subscription-mode

<documentation missing> (订阅模式,默认值: <none>,可能的取值: accept_all,reject_all,manual)spring-doc.cadn.net.cn

user

用户连接时应作为。 (字符串,默认:<none>)spring-doc.cadn.net.cn

xmpp.supplier
payload-expression

<文档缺失> (表达式,缺省: <none>)spring-doc.cadn.net.cn

stanza-filter

<documentation missing> (消息过滤器,默认: <none>)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。spring-doc.cadn.net.cn

5.22.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.22.5. 示例

java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost

5.23. ZeroMQ 源

“zeromq”源启用从ZeroMQ接收消息。spring-doc.cadn.net.cn

5.23.1. 输入

5.23.2. 输出

负载

5.23.3. 选项

Java开发Spring框架的英文网站有以下选项:zeromqspring-doc.cadn.net.cn

zeromq.supplier.bind-port

创建 ZeroMQ 套接字时绑定的端口;0 表示选择一个随机端口。(整数,默认:0)spring-doc.cadn.net.cn

zeromq.supplier.connect-url

连接到 ZeroMQ 套接字的 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

zeromq.supplier.consume-delay

当 ZeroMQ Socket 没有接收到数据时,消费的延迟。(持续时间,默认:1s)spring-doc.cadn.net.cn

zeromq.supplier.socket-type

连接应采用的套接字类型。(SocketType,默认值:<none>,可选值:PAIRPUBSUBREQREPDEALERROUTERPULLPUSHXPUBXSUBSTREAMCLIENTSERVERRADIODISHCHANNELPEERRAWSCATTERGATHER)spring-doc.cadn.net.cn

zeromq.supplier.topics

要订阅的主题。(字符串数组,默认:[])spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。spring-doc.cadn.net.cn

5.23.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.23.5. 示例

java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=

6. 处理器

6.1. 聚合处理器

聚合处理器使应用程序能够将传入的消息分组并释放到输出目标中。spring-doc.cadn.net.cn

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbcspring-doc.cadn.net.cn

如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。spring-doc.cadn.net.cn

6.1.1. 负载

如果输入负载为 byte[] 并且内容类型标题为 JSON,则 JsonBytesToMap 函数尝试反序列化此负载以获得更好的数据表示形式,以便在聚合器函数的输出中使用。此外,这种 Map 数据表示方式使得从下面提到的 SpEL 表达式轻松访问负载内容成为可能。否则(包括反序列化错误),输入负载保持不变——目标应用程序配置将其转换为所需的形式。spring-doc.cadn.net.cn

6.1.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

聚合器
聚合

用于聚合策略的SpEL表达式。默认为负载集合。(表达式,默认:<none>)spring-doc.cadn.net.cn

相关性

用于关联键的 SpEL 表达式。默认为 correlationId 标头。(表达式,默认:<none>)spring-doc.cadn.net.cn

group-timeout

SpEL 表达式用于设置超时未完成组的过期时间。(表达式,默认:<none>)spring-doc.cadn.net.cn

message-store-entity

持久化消息存储实体:关系型数据库中的表前缀,MongoDb 中的集合名称等。(字符串,默认:<none>)spring-doc.cadn.net.cn

message-store-type

消息存储类型。 (字符串,默认:<none>spring-doc.cadn.net.cn

发布

发布策略的 SpEL 表达式。默认情况下基于 sequenceSize 请求头。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb
additional-hosts

附加服务器主机。不能与 URI 设置同时使用,也不能在未指定 'host' 的情况下设置。附加主机将使用默认的 MongoDB 端口 27017。如果要使用其他端口,可以使用 "host:port" 语法。(List<String>, 默认值: <none>)spring-doc.cadn.net.cn

authentication-database

认证数据库名称。(字符串,默认:<none>spring-doc.cadn.net.cn

auto-index-creation

是否启用自动索引创建。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名。覆盖URI中的数据库。(字符串,默认:<none>spring-doc.cadn.net.cn

field-naming-strategy

要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

主机

Mongo服务器主机。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:<none>)spring-doc.cadn.net.cn

端口

Mongo服务器端口。不能与URI一起设置。(Integer,默认:<none>spring-doc.cadn.net.cn

replica-set-name

群集所需的副本集名称。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uri

Mongo 数据库连接字符串。此选项会覆盖主机、端口、用户名和密码。(String,默认值:mongodb://localhost/testspring-doc.cadn.net.cn

用户名

登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-representation

转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIEDSTANDARDC_SHARP_LEGACYJAVA_LEGACYPYTHON_LEGACY)spring-doc.cadn.net.cn

spring.data.mongodb.gridfs

GridFS存储桶名称。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

数据库

GridFS 数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用

是否启用SSL支持。如果提供了"bundle",则会自动启用,除非另外指定。(布尔值,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis
client-name

连接上要设置的客户端名称,通过 CLIENT SETNAME 设置。 (字符串,默认:<none>)spring-doc.cadn.net.cn

client-type

要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认值:<none>,可能的取值:LETTUCEJEDIS)spring-doc.cadn.net.cn

connect-timeout

连接超时。(持续时间,默认:<none>)spring-doc.cadn.net.cn

数据库

连接工厂使用的数据库索引。(整数,默认:0)spring-doc.cadn.net.cn

主机

Redis服务器主机。(字符串,默认:localhostspring-doc.cadn.net.cn

密码

Redis服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

Redis服务器端口。(整数,默认:6379spring-doc.cadn.net.cn

timeout

读取超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

url

连接 URL。覆盖主机、端口、用户名和密码。示例:redis://user:[电子邮件 protected]:6379 (字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

登录 Redis 服务器的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

spring.data.redis.cluster
max-redirects

执行命令跨越集群时要遵循的最大重定向次数。(整数,默认:<none>)spring-doc.cadn.net.cn

节点

用于引导的“主机:端口”对的逗号分隔列表。这表示一个“初始”的集群节点列表,且至少需要有一个条目。(List<String>, 默认值:<none>)spring-doc.cadn.net.cn

spring.data.redis.jedis.pool
启用

是否启用连接池。如果可用“commons-pool2”,则会自动启用。使用Jedis时,在哨兵模式下,默认启用连接池,此设置仅适用于单节点配置。(布尔型,缺省值:<none>)spring-doc.cadn.net.cn

max-active

连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:8)spring-doc.cadn.net.cn

max-idle

连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:8)spring-doc.cadn.net.cn

max-wait

连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

min-idle

池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:0)spring-doc.cadn.net.cn

time-between-eviction-runs

空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis.lettuce.pool
启用

是否启用连接池。如果可用“commons-pool2”,则会自动启用。使用Jedis时,在哨兵模式下,默认启用连接池,此设置仅适用于单节点配置。(布尔型,缺省值:<none>)spring-doc.cadn.net.cn

max-active

连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:8)spring-doc.cadn.net.cn

max-idle

连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:8)spring-doc.cadn.net.cn

max-wait

连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

min-idle

池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:0)spring-doc.cadn.net.cn

time-between-eviction-runs

空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis.lettuce
shutdown-timeout

关闭超时时间。(持续时间,默认:100msspring-doc.cadn.net.cn

spring.data.redis.sentinel
master

Redis服务器的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

节点

用冒号分隔的“主机:端口”对列表。(List<String>, 默认:<none>)spring-doc.cadn.net.cn

密码

用于与哨兵(sentinel)进行身份验证的密码。(字符串,默认:<none>spring-doc.cadn.net.cn

用户名

用于与哨兵(s)进行身份验证的登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用

是否启用SSL支持。如果提供了"bundle",则会自动启用,除非另外指定。(布尔值,默认:<none>)spring-doc.cadn.net.cn

spring.datasource
driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

6.2. 桥接处理器

一个处理器,通过简单地将传入的有效负载传递到出站来连接输入和输出。spring-doc.cadn.net.cn

6.2.1. 负载

6.3 过滤器处理器

过滤器处理器允许应用程序检查传入的有效载荷,然后对其应用谓词以决定是否需要继续记录。 例如,如果传入的有效载荷类型为 String 并且您想筛选掉少于五个字符的内容,则可以按如下方式运行过滤器处理器。spring-doc.cadn.net.cn

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4spring-doc.cadn.net.cn

如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。spring-doc.cadn.net.cn

6.3.1. 负载

您可以传递任何类型的负载,然后对其应用SpEL表达式进行过滤。如果传入的类型是byte[]且内容类型设置为text/plainapplication/json,则应用程序会将byte[]转换为Stringspring-doc.cadn.net.cn

6.3.2. 选项

filter.function.expression

针对请求消息进行过滤的布尔型SpEL表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

6.4. Groovy 处理器

一个处理器,用于在消息上应用 Groovy 脚本。spring-doc.cadn.net.cn

6.4.1. 选项

groovy-processor 处理器具有以下选项:spring-doc.cadn.net.cn

groovy-processor.script

指向用于处理消息的脚本的引用。(资源,缺省:<none>)spring-doc.cadn.net.cn

groovy-processor.variables

变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:<none>)spring-doc.cadn.net.cn

groovy-processor.variables-location

包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:<none>)spring-doc.cadn.net.cn

6.5. Header Enricher Processor

使用 header-enricher 应用添加消息头。spring-doc.cadn.net.cn

标题以换行符分隔的键值对形式提供,其中键是标题名称,值为SpEL表达式。
例如 --headers='foo=payload.someProperty \n bar=payload.otherProperty'spring-doc.cadn.net.cn

6.5.1. 选项

带有 header-enricher 处理器具有以下选项:spring-doc.cadn.net.cn

header.enricher.headers

换行分隔的属性表示标题,其中值是SpEL表达式,例如 foo='bar' baz=payload.baz。 (属性,默认:<none>)spring-doc.cadn.net.cn

header.enricher.overwrite

设置为 true 可覆盖任何现有的消息头。(布尔类型,默认:false)spring-doc.cadn.net.cn

6.6. Http 请求处理器

一个处理器应用程序,该程序向HTTP资源发出请求,并将响应正文作为消息有效负载进行发布。spring-doc.cadn.net.cn

6.6.1. 输入

headers

任何必需的 HTTP 头部必须通过 headersheaders-expression 属性显式设置。请参见下面的示例。
头部值也可用于构造:spring-doc.cadn.net.cn

负载

默认情况下,负载用作 POST 请求的请求体,并且可以是任何 Java 类型。对于 GET 请求,它应该是一个空字符串。负载还可以用于构建:spring-doc.cadn.net.cn

底层的WebClient支持Jackson JSON序列化,以支持任何请求和响应类型。expected-response-type属性,默认为String.class,可以设置为您应用程序类路径中的任何类。请注意,用户定义的有效负载类型需要向您的pom文件添加所需的依赖项。spring-doc.cadn.net.cn

6.6.2. 输出

headers

没有HTTP消息头被映射到出站Message。spring-doc.cadn.net.cn

负载

原始输出对象是ResponseEntity<?>,其任何字段(例如:bodyheaders)或访问器方法(statusCode)都可以作为reply-expression的一部分引用。默认情况下,传出消息的有效负载是响应体。请注意,Jackson 默认无法反序列化 ResponseEntity(由表达式 #root 引用),但可以将其渲染为HashMapspring-doc.cadn.net.cn

6.6.3. 选项

The http-request processor has the following options:spring-doc.cadn.net.cn

6.6.4. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

http.request
body-expression

从传入消息中推导请求体的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

expected-response-type

用于解释响应的类型。(Class<?>,默认:<none>spring-doc.cadn.net.cn

headers-expression

用于推导要使用的 HTTP 头部映射的 SpEL 表达式。 (表达式,默认:<none>spring-doc.cadn.net.cn

http-method-expression

从传入的消息推导请求方法的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

reply-expression

用于计算最终结果的SpEL表达式,应用于整个http {@link org.springframework.http.ResponseEntity}。 (表达式,默认:<none>)spring-doc.cadn.net.cn

timeout

请求超时时间(毫秒)。(Long,默认:30000spring-doc.cadn.net.cn

url-expression

对传入消息进行SpEL表达式计算以确定要使用的URL。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.codec
log-request-details

是否在调试级别记录表单数据,在跟踪级别记录头部信息。(布尔类型,默认:false)spring-doc.cadn.net.cn

max-in-memory-size

输入流需要聚合时可以缓冲的字节数限制。这仅适用于自动配置的 WebFlux 服务器和 WebClient 实例。默认情况下未设置,此时适用各个编解码器的默认值。大多数编解码器默认限制为 256K。(DataSize,默认:<none>)spring-doc.cadn.net.cn

6.7. 图像识别处理器

一个使用Inception模型对实时图像进行分类的处理器,可以将图像分为不同的类别(例如标签)。spring-doc.cadn.net.cn

模型实现了一个深度的卷积神经网络,可以在困难的视觉识别任务上达到合理性能- 匹配或超过某些领域(如图像识别)中的人类表现。spring-doc.cadn.net.cn

模型的输入是二进制数组形式的图像。spring-doc.cadn.net.cn

输出为以下格式的JSON消息:spring-doc.cadn.net.cn

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

Result 包含识别类别的名称(例如 label)以及该图像代表此类别的置信度(例如 confidence)。spring-doc.cadn.net.cn

如果将response-seize设置为高于1的值,则结果将包括最可能的前response-seize个标签。例如,response-size=3将返回:spring-doc.cadn.net.cn

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}

6.7.1. 负载

如果传入类型为 byte[] 且内容类型设置为 application/octet-stream,则应用程序将输入的 byte[] 图像处理并输出增强后的 byte[] 图像负载和 JSON 标头。spring-doc.cadn.net.cn

6.7.2. 选项

image.recognition.cache-model

缓存预训练的 TensorFlow 模型。(布尔类型,默认:true)spring-doc.cadn.net.cn

image.recognition.debug-output

<文档缺失> (布尔值,默认: false)spring-doc.cadn.net.cn

image.recognition.debug-output-path

<文档缺失> (字符串,默认:image-recognition-result.pngspring-doc.cadn.net.cn

image.recognition.model

预训练的TensorFlow图像识别模型。请注意,该模型必须与所选的模型类型匹配!(字符串,默认:https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb)spring-doc.cadn.net.cn

image.recognition.model-type

支持三种不同的预训练 tensorflow 图像识别模型:inception、mobilenetv1 和 mobilenetv2。1. inception图使用“input”作为输入,“output”作为输出。2. mobilenetv2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 标准化图像尺寸始终为正方形(例如 h=w)- 图使用“input”作为输入和“mobilenetv2/predictions/reshape_1”作为输出。3. mobilenetv1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图使用“input”作为输入和“mobilenetv1/predictions/reshape_1”作为输出。(modeltype,默认:<none>,可能值:inceptionmobilenetv1mobilenetv2)spring-doc.cadn.net.cn

image.recognition.normalized-image-size

归一化图像大小。(整数,默认:224)spring-doc.cadn.net.cn

image.recognition.response-size

已识别图片的数量。(整数,默认值:5)spring-doc.cadn.net.cn

6.8. 对象检测处理器

对象检测处理器提供开箱即用的 TensorFlow 对象检测 API 支持。它允许在单个图像或图像流中实时定位和识别多个对象。对象检测处理器是基于 对象检测功能 构建的。spring-doc.cadn.net.cn

您必须向处理器提供一个预训练的目标检测模型,以及相应的对象标签spring-doc.cadn.net.cn

这里是一些合理的配置默认值:spring-doc.cadn.net.cn

下图显示了一个 Spring Cloud 数据流,用于实时预测输入图像流中的对象类型。spring-doc.cadn.net.cn

scdf tensorflow object detection arch

处理器的输入是一个图像字节数组,输出是增强后的图像和一个标题,称为detected_objects,它提供了检测到的对象的文字描述:spring-doc.cadn.net.cn

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

0 头格式是:
spring-doc.cadn.net.cn

  • 对象名称:置信度 - 检测到的对象的人类可读名称(例如,标签)及其置信度,置信度为[0-1]之间的浮点数spring-doc.cadn.net.cn

  • x1y1x2y2 - 响应还提供了检测到的对象的边界框,表示为 (x1, y1, x2, y2)。坐标相对于图像大小进行相对定位。spring-doc.cadn.net.cn

  • cid-分类标识符,如所提供的标签配置文件中定义。spring-doc.cadn.net.cn

6.8.1. 负载

传入的类型是 byte[],内容类型是 application/octet-stream。处理器处理输入的 byte[] 图像,并输出一个增强的 byte[] 图像负载和一个 JSON 头部(detected_objects)。spring-doc.cadn.net.cn

6.8.2. 选项

object.detection.cache-model

<文档缺失> (布尔值,默认: true)spring-doc.cadn.net.cn

object.detection.confidence

<文档缺失> (浮点数,默认:0.4)spring-doc.cadn.net.cn

object.detection.debug-output

<文档缺失> (布尔值,默认: false)spring-doc.cadn.net.cn

object.detection.debug-output-path

<文档缺失> (字符串,默认:object-detection-result.pngspring-doc.cadn.net.cn

object.detection.labels

标签 URI。 (字符串,默认:https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt)spring-doc.cadn.net.cn

object.detection.model

预训练的 TensorFlow 对象检测模型。(字符串,默认:https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb)spring-doc.cadn.net.cn

object.detection.response-size

<documentation missing> (Integer,默认:<none>spring-doc.cadn.net.cn

object.detection.with-masks

<文档缺失> (布尔值,默认: false)spring-doc.cadn.net.cn

6.9. 语义分割处理器

基于最先进的DeepLab TensorFlow模型的图像语义分割。spring-doc.cadn.net.cn

图像分割是将图像中的每个像素与一个类别标签(如花朵、人、道路、天空、海洋或汽车)关联起来的过程。Semantic Segmentation不同于Instance Segmentation,后者生成实例感知的区域掩码,而Semantic Segmentation则生成类别感知的掩码。要实现Instance Segmentation,请参阅目标检测服务spring-doc.cadn.net.cn

Semantic Segmentation Processor 使用 语义分割功能 库和 TensorFlow 服务spring-doc.cadn.net.cn

6.9.1. 负载

传入的类型是 byte[],内容类型是 application/octet-stream。处理器处理输入的 byte[] 图像,并输出增强的 byte[] 图像负载和 json 头。spring-doc.cadn.net.cn

处理器的输入是图像字节数组,输出是增强后的图像字节数组,并且包含一个JSON标头semantic_segmentation,格式如下:spring-doc.cadn.net.cn

[
    [ 0, 0, 0 ],
    [ 127, 127, 127 ],
    [ 255, 255, 255 ],
    ...
]

输出头的json格式表示从输入图像计算出的颜色像素图。spring-doc.cadn.net.cn

6.9.2. 选项

semantic.segmentation.color-map-uri

每个预训练模型都基于某些对象颜色映射。预定义的选项有:- classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (字符串,默认:classpath:/colormap/citymap_colormap.json)spring-doc.cadn.net.cn

semantic.segmentation.debug-output

将输出图像保存在本地 debugOutputPath 路径下。(布尔类型,默认:falsespring-doc.cadn.net.cn

semantic.segmentation.debug-output-path

<文档缺失> (字符串,默认:semantic-segmentation-result.pngspring-doc.cadn.net.cn

semantic.segmentation.mask-transparency

计算分割掩码图像的alpha颜色。(浮点数,默认:0.45)spring-doc.cadn.net.cn

semantic.segmentation.model

预训练的TensorFlow语义分割模型。(字符串,默认:https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb)spring-doc.cadn.net.cn

semantic.segmentation.output-type

指定输出图像类型。您可以返回带有计算掩码叠加的输入图像,或者仅返回掩码。(OutputType,默认值:<none>,可选值:blendedmask)spring-doc.cadn.net.cn

6.10. 脚本处理器

使用脚本转换消息的处理器。脚本正文作为属性值直接提供。可以指定脚本语言(groovy/javascript/ruby/python)。spring-doc.cadn.net.cn

6.10.1. 选项

脚本处理器有以下选项:script-processorspring-doc.cadn.net.cn

script-processor.language

脚本属性中文本的语言。支持的类型为:groovy、javascript、ruby、python。(字符串,默认值:<none>spring-doc.cadn.net.cn

script-processor.script

脚本文本。(字符串,默认:<none>)spring-doc.cadn.net.cn

script-processor.variables

变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:<none>)spring-doc.cadn.net.cn

script-processor.variables-location

包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:<none>)spring-doc.cadn.net.cn

6.11. 分割处理器

拆分器应用程序建立在Spring Integration中的同名概念之上,允许将单个消息拆分为多个不同的消息。
处理器使用一个函数作为输入Message<?>,然后根据各种属性产生输出List<Message<?>(见下文)。
您可以使用SpEL表达式或定界符来指定如何拆分传入的消息。spring-doc.cadn.net.cn

6.11.1. 负载

如果传入的类型是 byte[] 并且内容类型设置为 text/plainapplication/json,则应用程序会将 byte[] 转换为 Stringspring-doc.cadn.net.cn

6.11.2. 选项

splitter.apply-sequence

在标头中添加关联/序列信息,以方便稍后的聚合。(布尔值,默认:true)spring-doc.cadn.net.cn

splitter.charset

在将基于文本的文件中的字节转换为字符串时要使用的字符集。(String,默认:<none>)spring-doc.cadn.net.cn

splitter.delimiters

当表达式为 null 时,用于对 {@link String} 负载进行标记化的分隔符。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

splitter.expression

用于拆分有效负载的 SpEL 表达式。(字符串,默认值:<none>)spring-doc.cadn.net.cn

splitter.file-markers

设置为 true 或 false,以使用一个 {@code FileSplitter}(按行拆分基于文本的文件)来包含(或不包含)文件开头/结尾标记。(布尔型,默认:<none>spring-doc.cadn.net.cn

splitter.markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

6.12. 转换处理器

转换器处理器允许您基于SpEL表达式来转换消息负载结构。spring-doc.cadn.net.cn

以下是运行此应用程序的示例。spring-doc.cadn.net.cn

java -jar transform-processor-kafka-<version>.jar \
    --spel.function.expression=payload.toUpperCase()

如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。spring-doc.cadn.net.cn

6.12.1. 负载

传入的消息可以包含任何类型的负载。spring-doc.cadn.net.cn

6.12.2. 选项

spel.function.expression

要应用的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

6.13. Twitter趋势和趋势位置处理器

能够返回热门话题或热门话题位置的处理器。
twitter.trend.trend-query-type属性允许选择查询类型。spring-doc.cadn.net.cn

6.13.1. 获取某个位置的趋势话题(可选)

在此模式下,将twitter.trend.trend-query-type设置为trendspring-doc.cadn.net.cn

基于趋势API的处理器。返回特定纬度、经度位置附近的流行话题spring-doc.cadn.net.cn

6.13.2. 获取趋势位置

在此模式下,将twitter.trend.trend-query-type设置为trendLocationspring-doc.cadn.net.cn

通过位置检索热门话题的完整列表或附近的位置列表。spring-doc.cadn.net.cn

如果未提供latitudelongitude参数,则处理器执行可用趋势API,并返回Twitter具有话题趋势信息的位置。spring-doc.cadn.net.cn

如果提供了latitudelongitude参数,处理器会执行趋势最近API并返回Twitter具有趋势话题信息的位置,这些位置最接近指定的位置。spring-doc.cadn.net.cn

响应是一个 locations 数组,用于编码位置的 WOEID 和其他人类可读信息,例如规范名称和该位置所属国家。spring-doc.cadn.net.cn

6.13.3. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

twitter.trend.closest
纬度

如果提供了长参数,可用趋势位置将以距离坐标对的距离排序,从最近到最远。经度的有效范围是 -180.0 到 +180.0(西为负,东为正),包含边界。(表达式,默认:<none>)spring-doc.cadn.net.cn

如果提供了 lat 参数,则可用的趋势位置将按照距离排序,最近到最远,对应坐标对。经度的有效范围为 -180.0 到 +180.0(西为负,东为正),包括端点。(表达式,默认:<none>)spring-doc.cadn.net.cn

twitter.trend
location-id

要返回趋势信息的位置的 Yahoo! 地方地球 ID。全球信息可使用 1 作为 WOEID 获取。(表达式,默认:payload)spring-doc.cadn.net.cn

trend-query-type

<文档缺失> (趋势查询类型,默认值: <none>,可能的值: trend,trendLocation)spring-doc.cadn.net.cn

7. 消息通道

7.1. Cassandra Sink(Cassandra 接收器)

此接收应用程序将接收到的每个消息的内容写入Cassandra。spring-doc.cadn.net.cn

它期望一个JSON字符串作为负载,并使用其属性来映射到表列。spring-doc.cadn.net.cn

7.1.1. 负载

表示要持久化的实体(或实体列表)的 JSON 字符串或字节数组。spring-doc.cadn.net.cn

7.1.2. 选项

cassandra接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

cassandra.cluster
create-keyspace

应用程序启动时创建(或不创建)keyspace的标志。(布尔值,默认:falsespring-doc.cadn.net.cn

entity-base-packages

要扫描实体注解的基包,这些实体带有Table注解。(String[],默认:[])spring-doc.cadn.net.cn

init-script

用于初始化keyspace模式的CQL脚本(由';'分隔)(资源,默认:<none>spring-doc.cadn.net.cn

skip-ssl-validation

验证服务器SSL证书的标志。(布尔值,默认:false)spring-doc.cadn.net.cn

Cassandra
consistency-level

写操作的一致性级别。(ConsistencyLevel,默认:<none>)spring-doc.cadn.net.cn

ingest-query

导入Cassandra查询。(字符串,默认:<none>)spring-doc.cadn.net.cn

query-type

用于 Cassandra Sink 的查询类型。 (类型,缺省值:<none>,有效值:INSERTUPDATEDELETESTATEMENT)spring-doc.cadn.net.cn

statement-expression

在Cassandra查询DSL风格中的表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

TTL

WriteOptions 的 Time-to-live 选项。(整数,默认:0spring-doc.cadn.net.cn

spring.cassandra
压缩

通过Cassandra二进制协议支持的压缩。(压缩,默认:none,可能值:LZ4SNAPPYNONEspring-doc.cadn.net.cn

配置

要使用的配置文件的位置。(资源,默认:<none>)spring-doc.cadn.net.cn

contact-points

集群节点地址,格式为“host:port”,或者仅使用配置端口的简单“host”。(List<String>, 默认:[127.0.0.1:9042])spring-doc.cadn.net.cn

keyspace-name

要使用的 Keyspace 名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

local-datacenter

被视为“本地”的数据中心。联系点应来自此数据中心。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

如果联系点未指定,则使用的端口。(整数,默认:9042)spring-doc.cadn.net.cn

schema-action

启动时要采取的架构操作。(字符串,默认:none)spring-doc.cadn.net.cn

session-name

命名 Cassandra 会话。(字符串,默认值:<none>spring-doc.cadn.net.cn

用户名

服务器登录用户。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cassandra.connection
connect-timeout

建立驱动连接时使用的超时时间。(持续时间,默认:5s)spring-doc.cadn.net.cn

init-query-timeout

在连接打开后初始化过程中运行的内部查询使用的超时时间。(持续时间,默认:5s)spring-doc.cadn.net.cn

spring.cassandra.controlconnection
timeout

用于控制查询的超时时间。(持续时间,默认:5s)spring-doc.cadn.net.cn

spring.cassandra.pool
heartbeat-interval

在空闲连接上发送消息以确保其仍然存活的心跳间隔。(持续时间,默认:30s)spring-doc.cadn.net.cn

idle-timeout

空闲连接被移除前的超时时间。(持续时间,默认:5s)spring-doc.cadn.net.cn

spring.cassandra.request
一致性

查询一致性级别。(默认一致性级别,缺省值:<none>,可选值:ANYONETWOTHREEQUORUMALLLOCAL_ONELOCAL_QUORUMEACH_QUORUMSERIALLOCAL_SERIAL)spring-doc.cadn.net.cn

page-size

单次网络往返传输将同时检索多少行。(整数,默认:5000spring-doc.cadn.net.cn

serial-consistency

查询序列一致性级别。(DefaultConsistencyLevel,缺省值:<none>,可选值:ANYONETWOTHREEQUORUMALLLOCAL_ONELOCAL_QUORUMEACH_QUORUMSERIALLOCAL_SERIAL)spring-doc.cadn.net.cn

timeout

驱动程序等待请求完成的时间长度。(持续时间,默认:2s)spring-doc.cadn.net.cn

spring.cassandra.request.throttler
drain-interval

节流器尝试从队列中移除请求的频率。设置为足够高的值,使得每次尝试都会处理多个队列条目,但又不会延迟过多的请求。(持续时间,默认:<none>)spring-doc.cadn.net.cn

max-concurrent-requests

允许并行执行的最大请求数量。(整数,默认:<none>)spring-doc.cadn.net.cn

max-queue-size

当限流阈值被超过时,可以排队的最大请求数量。(整数,默认:<none>)spring-doc.cadn.net.cn

max-requests-per-second

最大允许的请求速率。(整数,默认:<none>)spring-doc.cadn.net.cn

类型

请求限流类型。(ThrottlerType,默认:none,可选值:CONCURRENCY_LIMITINGRATE_LIMITINGNONE)spring-doc.cadn.net.cn

spring.cassandra.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用

是否启用SSL支持。(布尔类型,默认:<none>spring-doc.cadn.net.cn

7.2. 分析接收器

基于分析消费者构建的接收应用程序,该程序从输入消息中计算分析,并将分析结果作为指标发布到各种监控系统。它利用micrometer库在最流行的监控系统上提供统一的编程体验,并公开Spring表达式语言(SpEL)属性,用于定义如何从输入数据中计算度量名称、值和标签。spring-doc.cadn.net.cn

分析接收器可以生成两种指标类型:spring-doc.cadn.net.cn

一个 计量器(例如计数器或仪表)由其唯一的 namedimensions 确定(术语维度和标签可以互换使用)。维度允许对特定命名指标进行切片,以便深入分析和推理数据。spring-doc.cadn.net.cn

由于指标由其namedimensions唯一标识,因此您可以为每个指标分配多个标签(例如键/值对),但之后不能随意更改这些标签!如果具有相同名称的指标具有不同的标签集,则监控系统(如Prometheus)会发出警告。

使用analytics.nameanalytics.name-expression属性设置输出分析指标的名称。如果没有设置,指标名称默认为应用程序的名称。spring-doc.cadn.net.cn

使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>属性可以向您的指标添加一个或多个标签。TAG_NAME在属性定义中将作为标签名称出现在指标中。TAG_VALUE是SpEL表达式,该表达式从传入的消息动态计算出标签值。spring-doc.cadn.net.cn

表达式使用SpELheaders关键字来访问消息的头和有效载荷值。spring-doc.cadn.net.cn

您可以使用字面量(例如 'fixed value')来设置具有固定值的标签。

所有流应用程序开箱即用地支持三种最流行的监控系统,WavefrontPrometheusInfluxDB,您可以通过声明方式启用每种系统。通过只需向 Analytics Sink 应用程序添加它们的 Micrometer Meter-Registry 依赖项即可添加对其他监控系统的支持。spring-doc.cadn.net.cn

请访问 Spring Cloud Data Flow 流监控 获取配置监控系统的详细说明。以下快速片段可帮助您开始。spring-doc.cadn.net.cn

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink将重用提供的度量配置。

下图说明了Analytics Sink如何帮助收集股票交易所和实时数据管道的业务内部信息。spring-doc.cadn.net.cn

Analytics Architecture

7.2.1. 负载

传入的消息可以包含任何类型的负载。spring-doc.cadn.net.cn

7.2.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

分析
amount-expression

用于计算输出指标值(例如金额)的 SpEL 表达式。默认值为 1.0 (表达式,默认:<none>)spring-doc.cadn.net.cn

meter-type

用于向后端报告指标的 Micrometer 计量类型。(MeterType,默认值:<none>,可能的取值:countergauge)spring-doc.cadn.net.cn

姓名

输出指标的名称。'name' 和 'nameExpression' 相互排斥,只能设置其中一个。(字符串,默认:<none>)spring-doc.cadn.net.cn

name-expression

用于从输入消息计算输出指标名称的SpEL表达式。'name' 和 'nameExpression' 是互斥的,只能设置其中一个。(表达式,默认:<none>)spring-doc.cadn.net.cn

analytics.tag
表达式

从 SpEL 表达式计算标签。单个 SpEL 表达式可以生成值数组,这意味着不同的名称/值标签。每个名称/值标签都会产生单独的计量器增量。标签表达式格式是:analytics.tag.expression.[tag-name]=[SpEL 表达式] (Map<String, Expression>, 默认: <none>)spring-doc.cadn.net.cn

固定

已弃用:请使用 analytics.tag.expression 并带有字面量 SpEL 表达式。自定义固定标签。这些标签具有常数值,创建一次后会随每次发布的指标一起发送。定义固定标签的约定是:<code>analytics.tag.fixed.[tag-name]=[tag-value]</code>(Map<String, String>, 默认值:<none>)spring-doc.cadn.net.cn

7.3. Elasticsearch 写入器

将文档索引到Elasticsearch中的接收器。<br>spring-doc.cadn.net.cn

此Elasticsearch接收器仅支持索引JSON文档。
它从输入目标获取数据,然后将其索引到Elasticsearch中。
输入数据可以是普通的json字符串,或者表示JSON的java.util.Map
它还接受作为Elasticsearch提供的XContentBuilder的数据。
然而,在中间件不将记录保存为XContentBuilder的情况下,这种情况很少见。
这主要是为了直接调用消费者而提供的。spring-doc.cadn.net.cn

7.3.1. 选项

Spring 框架的 Elasticsearch 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

elasticsearch.consumer
异步

指示索引操作是异步还是同步。默认情况下,索引操作是同步进行的。(布尔值,默认:false)spring-doc.cadn.net.cn

batch-size

每次请求要索引的项目数量。默认值为1。如果设置大于1,则会使用批量索引API。(整数,默认:1)spring-doc.cadn.net.cn

group-timeout

批量索引激活时,在超时毫秒后将消息组刷新。默认值为-1,表示不会自动刷新空闲的消息组。(Long,默认:-1)spring-doc.cadn.net.cn

id

要索引的文档的id。如果设置了,则每条消息的INDEX_ID报头值将覆盖此属性。(表达式,默认:<none>)spring-doc.cadn.net.cn

索引

索引名称。如果设置,则每个消息的 INDEX_NAME 头值将覆盖此属性。(字符串,默认:<none>)spring-doc.cadn.net.cn

路由

指示要路由到的分片。如果没有提供,Elasticsearch 将默认使用文档 ID 的哈希。(字符串,默认:<none>)spring-doc.cadn.net.cn

timeout-seconds

分片可用的超时时间。如果未设置,则由Elasticsearch客户端默认设为1分钟。(Long,默认:0)spring-doc.cadn.net.cn

spring.elasticsearch
connection-timeout

与Elasticsearch通信时使用的连接超时时间。(持续时间,默认:1sspring-doc.cadn.net.cn

密码

用于与Elasticsearch进行身份验证的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

path-prefix

添加到发送给Elasticsearch的每个请求路径前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

socket-keep-alive

是否启用客户端与Elasticsearch之间的套接字保持活动。 (布尔类型,默认:false)spring-doc.cadn.net.cn

socket-timeout

与Elasticsearch通信时使用的套接字超时时间。(持续时间,默认:30s)spring-doc.cadn.net.cn

统一资源标识符(URIs)

用作逗号分隔的 Elasticsearch 实例列表。 (List<String>, 默认值: [http://localhost:9200])spring-doc.cadn.net.cn

用户名

用于与Elasticsearch进行身份验证的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

spring.elasticsearch.restclient.sniffer
delay-after-failure

在发生故障后安排嗅探执行的延迟时间。(持续时间,默认:1m)spring-doc.cadn.net.cn

间隔

连续普通嗅探执行之间的间隔。(持续时间,默认:5mspring-doc.cadn.net.cn

spring.elasticsearch.restclient.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.3.2. 运行此接收器的示例

  1. 从文件夹 elasticsearch-sink./mvnw clean packagespring-doc.cadn.net.cn

  2. cd appsspring-doc.cadn.net.cn

  3. 切换到正确的绑定器生成的应用程序(Kafka 或 RabbitMQ)spring-doc.cadn.net.cn

  4. ./mvnw clean packagespring-doc.cadn.net.cn

  5. 请确保您已运行Elasticsearch。例如,您可以使用以下命令将其作为docker容器运行。docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2spring-doc.cadn.net.cn

  6. 如果中间件(Kafka 或 RabbitMQ)尚未运行,请先启动它。spring-doc.cadn.net.cn

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testingspring-doc.cadn.net.cn

  8. 将一些JSON数据发送到中间件目的地。例如:{"foo":"bar"}spring-doc.cadn.net.cn

  9. 验证数据是否已索引:curl localhost:9200/testing/_searchspring-doc.cadn.net.cn

7.4. 文件接收器

文件接收应用程序将接收到的每个消息写入到一个文件中。spring-doc.cadn.net.cn

7.4.1. 负载

7.4.2. 选项

代码file-sink有以下选项:spring-doc.cadn.net.cn

file.consumer.binary

一个标志,用于指示是否应抑制在写入后添加换行符。(布尔值,默认:false)spring-doc.cadn.net.cn

file.consumer.charset

写入文本内容时使用的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

file.consumer.directory

目标文件的父目录。(File,缺省:<none>)spring-doc.cadn.net.cn

file.consumer.directory-expression

要评估的目标文件的父目录表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

file.consumer.mode

如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认值:<none>,可选值:APPENDAPPEND_NO_FLUSHFAILIGNOREREPLACEREPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

file.consumer.name

目标文件的名称。(字符串,默认:file-consumer)spring-doc.cadn.net.cn

file.consumer.name-expression

要计算的目标文件名的表达式。(字符串,默认值:<none>)spring-doc.cadn.net.cn

file.consumer.suffix

文件名后缀。 (字符串,默认:<empty string>)spring-doc.cadn.net.cn

7.5. FTP 接收器

FTP接收器是从传入的消息推送到FTP服务器的简单选项。spring-doc.cadn.net.cn

它使用ftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和密码来登录。spring-doc.cadn.net.cn


默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会
使用该文件的原始名称。

7.5.1. 头部

7.5.2. 负载

7.5.3. 输出

不适用(写入FTP服务器)。spring-doc.cadn.net.cn

7.5.4. 可选项

FTP 接收器 具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

ftp.consumer
auto-create-dir

是否创建远程目录。(布尔值,默认:true)spring-doc.cadn.net.cn

filename-expression

用于生成远程文件名的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

模式

远程文件已存在时要采取的操作。(FileExistsMode,默认:<none>,可选值:APPENDAPPEND_NO_FLUSHFAILIGNOREREPLACEREPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

temporary-remote-dir

如果 '#isUseTemporaryFilename()' 为 true,则文件将被写入的临时目录。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

use-temporary-filename

是否写入临时文件并重命名。(布尔类型,默认:true)spring-doc.cadn.net.cn

ftp.factory
cache-sessions

缓存会话。 (布尔值,默认:<none>spring-doc.cadn.net.cn

client-mode

FTP会话使用的客户端模式。(ClientMode,默认:<none>,可选值:ACTIVEPASSIVE)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:21)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

7.6. JDBC Sink

JDBC接收器允许您将传入的有效负载持久保存到关系型数据库管理系统(RDBMS)数据库中。spring-doc.cadn.net.cn

jdbc.consumer.columns 属性表示 COLUMN_NAME[:EXPRESSION_FOR_VALUE] 的键值对,其中 EXPRESSION_FOR_VALUE(连同冒号)是可选的。 在这种情况下,该值通过生成的表达式(如 payload.COLUMN_NAME)进行评估,因此我们可以通过这种方式实现从对象属性到表列的直接映射。 例如,我们有一个如下所示的 JSON 负载:spring-doc.cadn.net.cn

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

因此,我们可以使用以下配置将数据插入到具有 namecitystreet 结构的表中:spring-doc.cadn.net.cn

--jdbc.consumer.columns=name,city:address.city,street:address.street

此接收器支持批量插入,具体取决于底层的 JDBC 驱动程序。 通过 batch-sizeidle-timeout 属性配置批量插入: 当接收到的消息达到 batch-size 条时进行聚合,并作为一批次插入。 如果在没有新消息的情况下经过了 idle-timeout 毫秒,则即使批次大小小于 batch-size,也会插入已聚合的数据,以此来限制最大延迟。spring-doc.cadn.net.cn

该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。

7.6.1. 示例

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
            --jdbc.consumer.columns=name \
            --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
            --spring.datasource.url='jdbc:mysql://localhost:3306/test

7.6.2. Payload

7.6.3. 选项

jdbc接收器有以下选项:jdbcspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jdbc.consumer
batch-size

当消息数量达到阈值时,数据将被刷新到数据库表中。(整数,默认:1)spring-doc.cadn.net.cn

逗号分隔的基于冒号的列名和 SpEL 表达式对,用于插入/更新值。在初始化时使用这些名称来发布 DDL。(字符串,默认:payload:payload.toString())spring-doc.cadn.net.cn

idle-timeout

数据自动刷新到数据库表时的空闲超时时间(毫秒)。(Long,默认:-1)spring-doc.cadn.net.cn

初始化

'true'、'false' 或表格的自定义初始化脚本的位置。(字符串,默认:false)spring-doc.cadn.net.cn

table-name

要写入的表名。(字符串,默认:messages)spring-doc.cadn.net.cn

spring.datasource
driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.7. Apache Kafka 写入器

此模块向Apache Kafka发布消息。spring-doc.cadn.net.cn

7.7.1. 选项

<strong>kafka</strong>接收器有以下选项:
spring-doc.cadn.net.cn

(参见 Spring for Apache Kafka 的 Spring Boot 配置属性文档)spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

kafka.publisher

Kafka记录键 - 如果提供了keyExpression,则会被覆盖。(字符串,默认:<none>spring-doc.cadn.net.cn

key-expression

一个求值为 Kafka 记录键的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

mapped-headers

要映射的标头。 (String[], 默认值: [*])spring-doc.cadn.net.cn

分区

Kafka主题分区 - 如果提供了 partitionExpression,则会被覆盖。 (整数,默认:<none>)spring-doc.cadn.net.cn

partition-expression

一个评估为 Kafka 主题分区的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

send-timeout

Kafka 生产者处理器应等待发送操作结果的时间。默认为 10 秒。(时间段,缺省:10s)spring-doc.cadn.net.cn

同步

如果Kafka生产者处理器应该在同步模式下操作,则为真。(布尔值,默认:false)spring-doc.cadn.net.cn

时间戳

Kafka记录时间戳 - 如果提供了时间戳表达式,则由其覆盖。 (长整型,默认:<none>spring-doc.cadn.net.cn

timestamp-expression

一个计算为 Kafka 记录时间戳的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

主题

Kafka 主题 - 如果提供了 topicExpression,则会覆盖它。默认为 KafkaTemplate.getDefaultTopic() (字符串,默认:<none>)spring-doc.cadn.net.cn

topic-expression

一个计算结果为 Kafka 主题的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

use-template-converter

是否使用模板的消息转换器来创建 Kafka 记录。(布尔值,默认:false)spring-doc.cadn.net.cn

spring.kafka
bootstrap-servers

用于建立与Kafka集群初始连接的主机:端口对的逗号分隔列表。除非被覆盖,否则适用于所有组件。(List<String>, 默认:<none>)spring-doc.cadn.net.cn

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)spring-doc.cadn.net.cn

属性

用于配置客户端的附加属性,适用于生产者和消费者。(Map<String, String>, 默认值: <none>)spring-doc.cadn.net.cn

spring.kafka.producer
acks

生产者要求领导者在认为请求已完成之前收到的确认数量。(字符串,默认:<none>)spring-doc.cadn.net.cn

batch-size

默认批处理大小。较小的批处理大小会使批处理不那么常见,并可能降低吞吐量(批处理大小为零会完全禁用批处理)。(DataSize,默认:<none>)spring-doc.cadn.net.cn

bootstrap-servers

用于建立与Kafka集群初始连接的主机:端口对的逗号分隔列表。覆盖全局属性,适用于生产者。(List<String>, 默认值: <none>)spring-doc.cadn.net.cn

buffer-memory

生产者可以使用的总内存大小,用于缓冲等待发送到服务器的数据。(数据大小,默认:<none>)spring-doc.cadn.net.cn

client-id

ID to pass to the server when making requests. Used for server-side logging. (String, default: <none>)spring-doc.cadn.net.cn

compression-type

生产者生成的所有数据的压缩类型。(字符串,默认:<none>)spring-doc.cadn.net.cn

key-serializer

用于键序列化的类。(Class<?>, 默认: <none>)spring-doc.cadn.net.cn

属性

用于配置客户端的额外生产者特定属性。(Map<String, String>, 默认: <none>)spring-doc.cadn.net.cn

重试

大于零时,启用失败发送的重试。(Integer,默认:<none>)spring-doc.cadn.net.cn

transaction-id-prefix

非空时,为生产者启用事务支持。(字符串,默认:<none>)spring-doc.cadn.net.cn

value-serializer

值的序列化器类。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

spring.kafka.template
default-topic

消息发送到的默认主题。 (字符串,默认值:<none>spring-doc.cadn.net.cn

transaction-id-prefix

事务ID前缀,覆盖生产者工厂中的事务ID前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.8. 日志接收器

log 接收器使用应用程序记录器输出数据以供检查。spring-doc.cadn.net.cn

请理解,log 接收器使用无类型的处理器,这会影响实际的日志记录方式。这意味着如果内容类型为文本,则原始负载字节将被转换为字符串;否则将记录原始字节。用户指南中包含更多信息。spring-doc.cadn.net.cn

7.8.1. 参数<br>

日志接收器具有以下选项:logspring-doc.cadn.net.cn

log.expression

要作为日志消息记录的消息上的SpEL表达式。(字符串,默认:payload)spring-doc.cadn.net.cn

log.level

日志消息记录的级别。(级别,默认:<none>,可能值:FATALERRORWARNINFODEBUGTRACE)spring-doc.cadn.net.cn

log.name

要使用的记录器的名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

7.9. MongoDB Sink

此接收应用程序将传入数据存入MongoDB。
该应用程序完全基于MongoDataAutoConfiguration,因此请参阅Spring Boot MongoDB支持获取更多信息。spring-doc.cadn.net.cn

7.9.1. 输入

7.9.2. 选项

MongoDB 接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mongodb.consumer
集合

存储数据的 MongoDB 集合。(字符串,默认:<none>)spring-doc.cadn.net.cn

collection-expression

要评估 MongoDB 集合的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb
additional-hosts

附加服务器主机。不能与 URI 设置同时使用,也不能在未指定 'host' 的情况下设置。附加主机将使用默认的 MongoDB 端口 27017。如果要使用其他端口,可以使用 "host:port" 语法。(List<String>, 默认值: <none>)spring-doc.cadn.net.cn

authentication-database

认证数据库名称。(字符串,默认:<none>spring-doc.cadn.net.cn

auto-index-creation

是否启用自动索引创建。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名。覆盖URI中的数据库。(字符串,默认:<none>spring-doc.cadn.net.cn

field-naming-strategy

要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

主机

Mongo服务器主机。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:<none>)spring-doc.cadn.net.cn

端口

Mongo服务器端口。不能与URI一起设置。(Integer,默认:<none>spring-doc.cadn.net.cn

replica-set-name

群集所需的副本集名称。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uri

Mongo 数据库连接字符串。此选项会覆盖主机、端口、用户名和密码。(String,默认值:mongodb://localhost/testspring-doc.cadn.net.cn

用户名

登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-representation

转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIEDSTANDARDC_SHARP_LEGACYJAVA_LEGACYPYTHON_LEGACY)spring-doc.cadn.net.cn

spring.data.mongodb.gridfs

GridFS存储桶名称。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

数据库

GridFS 数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用

是否启用SSL支持。如果提供了"bundle",则会自动启用,除非另外指定。(布尔值,默认:<none>)spring-doc.cadn.net.cn

7.10. MQTT接收器

此模块用于向MQTT发送消息。spring-doc.cadn.net.cn

7.10.1. 负载:

7.10.2. 选项

MQTT接收器具有以下选项:mqttspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mqtt
clean-session

客户端和服务器是否应记住重启和重新连接之间的状态。(布尔值,默认:true)spring-doc.cadn.net.cn

connection-timeout

连接超时时间(秒)。 (Integer, 默认值:30)spring-doc.cadn.net.cn

keep-alive-interval

ping间隔,单位为秒。(整数,默认值:60)spring-doc.cadn.net.cn

密码

连接到代理时要使用的密码。(字符串,默认:guest)spring-doc.cadn.net.cn

持久化

'memory' 或 'file'。 (字符串,默认:memory)spring-doc.cadn.net.cn

persistence-directory

持久化目录。 (字符串,默认:/tmp/pahospring-doc.cadn.net.cn

ssl-properties

MQTT 客户端 SSL 属性。(Map<String, String>,默认:<none>spring-doc.cadn.net.cn

url

MQTT代理的位置(以逗号分隔的列表)。(字符串数组,默认:[tcp://localhost:1883])spring-doc.cadn.net.cn

用户名

连接到代理时使用的用户名。(字符串,默认:guest)spring-doc.cadn.net.cn

mqtt.consumer
异步

是否使用异步发送。(布尔类型,默认:false)spring-doc.cadn.net.cn

编码

用于将字符串负载转换为字节的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

client-id

标识客户端。(字符串,默认:stream.client.id.sink)spring-doc.cadn.net.cn

服务质量

要使用的服务的质量。(整数,默认:1)spring-doc.cadn.net.cn

保留

是否设置 'retained' 标志。 (布尔值,默认:false)spring-doc.cadn.net.cn

主题

接收器将发布的主题。(字符串,默认:stream.mqtt)spring-doc.cadn.net.cn

7.11. Pgcopy 填充器

一个模块,使用PostgreSQL COPY命令将传入的有效负载写入RDBMS。spring-doc.cadn.net.cn

7.11.1. 输入

headers
负载

列表达式将根据消息进行评估,该表达式通常仅与一种类型(如 Map 或 bean 等)兼容。spring-doc.cadn.net.cn

7.11.2. 输出

7.11.3. 选项

jdbc接收器有以下选项:jdbcspring-doc.cadn.net.cn

spring.datasource.driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.password

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.username

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。

7.11.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

你可以在这里找到相应的活页夹基础项目。 然后,你可以进入其中一个文件夹并进行构建:spring-doc.cadn.net.cn

$ ./mvnw clean package

要运行集成测试,请在本地主机上启动一个 PostgreSQL 数据库:spring-doc.cadn.net.cn

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. 示例

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

此模块向RabbitMQ发送消息。spring-doc.cadn.net.cn

7.12.1. 选项

rabbit 接收器有以下选项:
spring-doc.cadn.net.cn

(请参阅Spring Boot文档中的RabbitMQ连接属性)spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

兔子
converter-bean-name

自定义消息转换器的bean名称;如果省略,则使用SimpleMessageConverter。如果为'jsonConverter',将为您创建一个Jackson2JsonMessageConverter bean。(字符串,默认:<none>)spring-doc.cadn.net.cn

交换

交换名称 - 如果提供了 exchangeNameExpression,则会被覆盖。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

exchange-expression

一个计算结果为交换名称的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

headers-mapped-last

在映射传出消息的标头时,确定是在转换消息之前还是之后进行标头映射。(布尔型,默认:true)spring-doc.cadn.net.cn

mapped-request-headers

要映射的标头。 (String[], 默认值: [*])spring-doc.cadn.net.cn

own-connection

为 true 时,使用基于引导属性的单独连接。(布尔类型,默认值:false)spring-doc.cadn.net.cn

persistent-delivery-mode

当没有 'amqp_deliveryMode' 标头时的默认传递模式,PERSISTENT 表示为 true。(Boolean,默认:false)spring-doc.cadn.net.cn

routing-key

路由键 - 如果提供了 routingKeyExpression,则会被覆盖。(字符串,默认:<none>)spring-doc.cadn.net.cn

routing-key-expression

一个计算路由键的SpEL表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.rabbitmq
address-shuffle-mode

用于打乱配置地址的模式。(AddressShuffleMode,默认值:none,可选值:NONERANDOMINORDER)spring-doc.cadn.net.cn

地址

客户端应连接的地址列表,用逗号分隔。设置后,主机和端口将被忽略。(字符串,默认:<none>)spring-doc.cadn.net.cn

channel-rpc-timeout

通道中RPC调用的继续超时时间。将其设置为零以无限期等待。(持续时间,默认:10m)spring-doc.cadn.net.cn

connection-timeout

连接超时时间。将其设置为零以无限期等待。(持续时间,默认:<none>)spring-doc.cadn.net.cn

主机

RabbitMQ主机。如果设置了地址,则忽略此设置。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

登录以向代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

端口

RabbitMQ 端口。如果设置了地址,则忽略此设置。默认为 5672,如果启用了 SSL,则默认为 5671。(Integer,默认:<none>)spring-doc.cadn.net.cn

publisher-confirm-type

发布者类型确认要使用的。(ConfirmType,默认值:<none>,可能的取值:SIMPLECORRELATEDNONE)spring-doc.cadn.net.cn

publisher-returns

是否启用发布商返还。(布尔值,默认:falsespring-doc.cadn.net.cn

requested-channel-max

客户端请求的每个连接的通道数。使用0表示无限制。(整数,默认:2047)spring-doc.cadn.net.cn

requested-heartbeat

请求的心跳超时时间;零表示无。如果未指定持续时间后缀,则使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户以对经纪人进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

virtual-host

连接到代理时使用的虚拟主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.rabbitmq.template
default-receive-queue

当未明确指定时,接收消息的默认队列名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

交换

发送操作使用的默认交换机的名称。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

必填

是否启用强制消息。(布尔值,默认:<none>spring-doc.cadn.net.cn

receive-timeout

接收操作的超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

reply-timeout

发送和接收操作的超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

routing-key

用于发送操作的默认路由密钥值。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

7.13. Redis存储

向Redis发送消息。spring-doc.cadn.net.cn

7.13.1. 选项

Redis接收器具有以下选项:redisspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

redis.consumer

存储密钥时使用的字面量键名。(字符串,默认:<none>)spring-doc.cadn.net.cn

key-expression

A SpEL 表达式,用于存储到键中。(字符串,默认:<none>spring-doc.cadn.net.cn

队列

存储到队列时要使用的队列名称字面量。(字符串,默认:<none>)spring-doc.cadn.net.cn

queue-expression

用于队列的 SpEL 表达式。(字符串,默认值:<none>)spring-doc.cadn.net.cn

主题

发布到主题时使用的文字主题名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

topic-expression

用于主题的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis
client-name

连接上要设置的客户端名称,通过 CLIENT SETNAME 设置。 (字符串,默认:<none>)spring-doc.cadn.net.cn

client-type

要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认值:<none>,可能的取值:LETTUCEJEDIS)spring-doc.cadn.net.cn

connect-timeout

连接超时。(持续时间,默认:<none>)spring-doc.cadn.net.cn

数据库

连接工厂使用的数据库索引。(整数,默认:0)spring-doc.cadn.net.cn

主机

Redis服务器主机。(字符串,默认:localhostspring-doc.cadn.net.cn

密码

Redis服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

Redis服务器端口。(整数,默认:6379spring-doc.cadn.net.cn

timeout

读取超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

url

连接 URL。覆盖主机、端口、用户名和密码。例如:redis://user:[[email protected]]:6379 (字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

登录 Redis 服务器的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

spring.data.redis.cluster
max-redirects

执行命令跨越集群时要遵循的最大重定向次数。(整数,默认:<none>)spring-doc.cadn.net.cn

节点

用于引导的“主机:端口”对的逗号分隔列表。这表示一个“初始”的集群节点列表,且至少需要有一个条目。(List<String>, 默认值:<none>)spring-doc.cadn.net.cn

spring.data.redis.jedis.pool
启用

是否启用连接池。如果可用“commons-pool2”,则会自动启用。使用Jedis时,在哨兵模式下,默认启用连接池,此设置仅适用于单节点配置。(布尔型,缺省值:<none>)spring-doc.cadn.net.cn

max-active

连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:8)spring-doc.cadn.net.cn

max-idle

连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:8)spring-doc.cadn.net.cn

max-wait

连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

min-idle

池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:0)spring-doc.cadn.net.cn

time-between-eviction-runs

空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis.lettuce.pool
启用

是否启用连接池。如果可用“commons-pool2”,则会自动启用。使用Jedis时,在哨兵模式下,默认启用连接池,此设置仅适用于单节点配置。(布尔型,缺省值:<none>)spring-doc.cadn.net.cn

max-active

连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:8)spring-doc.cadn.net.cn

max-idle

连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:8)spring-doc.cadn.net.cn

max-wait

连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

min-idle

池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:0)spring-doc.cadn.net.cn

time-between-eviction-runs

空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis.lettuce
shutdown-timeout

关闭超时时间。(持续时间,默认:100msspring-doc.cadn.net.cn

spring.data.redis.sentinel
master

Redis服务器的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

节点

用冒号分隔的“主机:端口”对列表。(List<String>, 默认:<none>)spring-doc.cadn.net.cn

密码

用于与哨兵(sentinel)进行身份验证的密码。(字符串,默认:<none>spring-doc.cadn.net.cn

用户名

用于与哨兵(s)进行身份验证的登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.redis.ssl
捆绑

SSL捆绑包名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用

是否启用SSL支持。如果提供了"bundle",则会自动启用,除非另外指定。(布尔值,默认:<none>)spring-doc.cadn.net.cn

7.14. 路由器接收器

此应用程序将消息路由到命名通道。spring-doc.cadn.net.cn

7.14.1. 选项

路由器接收器具有以下选项:routerspring-doc.cadn.net.cn

router.default-output-binding

无法路由的消息发送到何处。(字符串,默认:<none>)spring-doc.cadn.net.cn

router.destination-mappings

目的地映射作为以换行符分隔的名称-值对字符串,例如 'foo=bar\ baz=car'。 (属性,默认:<none>)spring-doc.cadn.net.cn

router.expression

应用于消息以确定路由到的通道的表达式。请注意,对于文本、JSON 或 XML 等内容类型,有效负载的字节格式为 byte[] 而不是 String!请查阅文档了解如何处理字节数组有效负载内容。(表达式,默认:<none>)spring-doc.cadn.net.cn

router.refresh-delay

检查脚本更改的时间间隔(毫秒);如果为负数,则不刷新。(Integer,默认值:60000)spring-doc.cadn.net.cn

router.resolution-required

是否需要通道解析。(布尔类型,默认:false)spring-doc.cadn.net.cn

router.script

返回通道或通道映射解析键的 Groovy 脚本的位置。(资源,默认:<none>)spring-doc.cadn.net.cn

router.variables

变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:<none>)spring-doc.cadn.net.cn

router.variables-location

包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:<none>)spring-doc.cadn.net.cn

此路由器接收器基于 Spring Cloud Stream 的 StreamBridge API,因此可以根据需要创建目标。

spring-doc.cadn.net.cn

在这种情况下,如果键未包含在 destinationMappings 中,则可以到达 defaultOutputBindingspring-doc.cadn.net.cn

resolutionRequired = true 忽略了 defaultOutputBinding,并且如果没有映射和相应的绑定已声明,则会抛出异常。spring-doc.cadn.net.cn

您可以使用 spring.cloud.stream.dynamicDestinations 属性限制动态绑定的创建。默认情况下,所有解析的目标都会被动态绑定;如果此属性包含一个以逗号分隔的目标名称列表,则只有这些目标会被绑定。解析到不在该列表中的目的地的消息将路由到 defaultOutputBinding,它也必须出现在该列表中。spring-doc.cadn.net.cn

代码destinationMappings用于将评估结果映射到实际的目标名称。spring-doc.cadn.net.cn

7.14.2. 基于 SpEL 的路由

该表达式针对消息进行求值,并返回一个频道名称,或者是一个频道名称映射表的键。spring-doc.cadn.net.cn

有关更多信息,请参阅《Spring Integration参考手册》中的“路由器与Spring表达式语言(SpEL)”小节。配置通用路由器部分。spring-doc.cadn.net.cn

7.14.3. 基于Groovy的路由

除了使用 SpEL 表达式之外,还可以使用 Groovy 脚本。让我们在文件系统中创建一个 Groovy 脚本,位于 "file:/my/path/router.groovy" 或者 "classpath:/my/path/router.groovy" :spring-doc.cadn.net.cn

println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

如果您希望向脚本传递变量值,可以使用variables选项静态绑定值,或者选择性地通过propertiesLocation选项传入包含绑定信息的属性文件路径。文件中的所有属性都将作为变量提供给脚本。您可以同时指定variablespropertiesLocation,在这种情况下,以variables提供的任何重复值都会覆盖在propertiesLocation中提供的值。

请注意payloadheaders是隐式绑定的,以便您能够访问消息中包含的数据。spring-doc.cadn.net.cn

有关更多信息,请参阅Spring Integration参考手册Groovy支持spring-doc.cadn.net.cn

7.15. RSocket Sink

RSocket 使用 RSocket 协议的 fire-and-forget 策略将数据发送到消息接收器。spring-doc.cadn.net.cn

7.15.1. 选项

rsocket 接收器具有以下选项:
spring-doc.cadn.net.cn

rsocket.consumer.host

RSocket 主机。 (字符串,默认值:localhostspring-doc.cadn.net.cn

rsocket.consumer.port

RSocket 端口。(整数,默认值:7000spring-doc.cadn.net.cn

rsocket.consumer.route

用于 RSocket 的路由。(字符串,默认:<none>)spring-doc.cadn.net.cn

rsocket.consumer.uri

可用于基于 WebSocket 的传输的 URI。(URI,默认:<none>)spring-doc.cadn.net.cn

7.16. Amazon S3 接收器

此接收应用程序支持将对象传输到Amazon S3存储桶。
文件负载(以及递归目录)被传输到remote目录(S3存储桶),然后传输到应用程序部署所在的local目录。spring-doc.cadn.net.cn

此接收器接受的消息必须包含以下之一:payloadspring-doc.cadn.net.cn

7.16.1. 选项

s3接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

s3.consumer
访问控制列表

S3 对象访问控制列表。(ObjectCannedACL,默认值:<none>,可能的值:privatepublic-readpublic-read-writeauthenticated-readaws-exec-readbucket-owner-readbucket-owner-full-controlnull)spring-doc.cadn.net.cn

acl-expression

要评估的 S3 对象访问控制列表表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

目标文件存储的 AWS 存储桶。(字符串,默认:<none>spring-doc.cadn.net.cn

bucket-expression

要计算的AWS存储桶名称表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

key-expression

要评估的 S3 对象键表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.aws.credentials
access-key

与静态提供程序一起使用的访问密钥。(字符串,默认值:<none>)spring-doc.cadn.net.cn

instance-profile

使用默认配置来配置实例配置文件凭据提供程序。(布尔类型,默认:false)spring-doc.cadn.net.cn

个人资料

AWS 配置文件。 (配置文件,缺省:<none>)spring-doc.cadn.net.cn

secret-key

使用静态提供程序的密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.aws.region
instance-profile

使用默认配置设置实例配置文件区域提供程序。(布尔值,默认:false)spring-doc.cadn.net.cn

个人资料

AWS 配置文件。 (配置文件,缺省:<none>)spring-doc.cadn.net.cn

静态

<文档缺失> (字符串,默认:<none>spring-doc.cadn.net.cn

spring.cloud.aws.s3
accelerate-mode-enabled

访问 S3 时启用加速端点的选项。加速端点通过使用 Amazon CloudFront 的全球分布边缘位置,允许更快地传输对象。(布尔值,默认:<none>)spring-doc.cadn.net.cn

checksum-validation-enabled

选项:禁用对存储在S3中的对象校验和进行验证。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

chunked-encoding-enabled

用于在对 {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} 和 {@link software.amazon.awssdk.services.s3.model.UploadPartRequest} 签名请求负载时启用分块编码的选项。(布尔值,默认:<none>)spring-doc.cadn.net.cn

cross-region-enabled

启用跨区域存储桶访问。(布尔类型,默认为<none>spring-doc.cadn.net.cn

端点

覆盖默认端点。(URI,默认:<none>)spring-doc.cadn.net.cn

path-style-access-enabled

启用使用路径样式访问方式来访问S3对象,而不是DNS样式访问。推荐使用DNS样式访问,因为它在访问S3时会实现更好的负载均衡。(布尔值,默认:<none>)spring-doc.cadn.net.cn

区域

覆盖默认区域。 (字符串,默认:<none>)spring-doc.cadn.net.cn

use-arn-region-enabled

如果在作为具有与客户端配置区域不同的区域的S3操作目标时传递了S3资源ARN,则必须将此标志设置为'true',以允许客户端向ARN中指定的区域进行跨区域调用,否则将抛出异常。(布尔型,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.aws.s3.crt
initial-read-buffer-size-in-bytes

配置客户端用于缓冲从S3下载部分数据的初始缓冲区大小。维持较大的窗口以保持较高的下载吞吐量;除非窗口足够大以容纳多个部分,否则无法并行下载部分数据。维持较小的窗口以限制内存中缓冲的数据量。(长整型,默认:<none>)spring-doc.cadn.net.cn

max-concurrency

指定在传输期间应建立的最大 S3 连接数。(整数,默认:<none>)spring-doc.cadn.net.cn

minimum-part-size-in-bytes

设置传输部分的最小大小。减小最小部分大小会导致多部分传输被分割成更多更小的部分。将此值设置得太低会对传输速度产生负面影响,为每个部分增加额外的延迟和网络通信开销。(Long,默认:<none>)spring-doc.cadn.net.cn

target-throughput-in-gbps

传输请求的目标吞吐量。较高的值意味着会打开更多的 S3 连接。传输管理器是否能够达到配置的目标吞吐量取决于多种因素,例如环境的网络带宽和配置的 `max-concurrency`。(Double,默认:<none>)spring-doc.cadn.net.cn

基于AmazonS3SinkConfiguration生成的目标应用程序可以使用S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener进行增强,这两个组件被注入到S3MessageHandler Bean中。有关详细信息,请参阅Spring集成AWS支持。spring-doc.cadn.net.cn

7.16.2. 亚马逊AWS通用选项

Amazon S3 连接器(如同所有其他 Amazon AWS 应用程序)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。请查阅相关文档了解所需和有用的自动配置属性。spring-doc.cadn.net.cn

其中一些是关于AWS凭据的:spring-doc.cadn.net.cn

其他用于 AWS Region 定义:spring-doc.cadn.net.cn

示例
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. SFTP接收器

SFTP接收器是从传入消息向SFTP服务器推送文件的简单选项。spring-doc.cadn.net.cn

它使用sftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和密码来登录。spring-doc.cadn.net.cn


默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会
使用该文件的原始名称。

配置 sftp.factory.known-hosts-expression 选项时,评估的根对象是应用程序上下文,例如sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'spring-doc.cadn.net.cn

7.17.1. 输入

headers

7.17.2. 输出

N/A (写入到 SFTP 服务器)。spring-doc.cadn.net.cn

7.17.3. 选项

sftp 接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

sftp.consumer
auto-create-dir

是否创建远程目录。(布尔类型,默认:true)spring-doc.cadn.net.cn

filename-expression

用于生成远程文件名的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

模式

远程文件已存在时要采取的操作。(FileExistsMode,默认:<none>,可选值:APPENDAPPEND_NO_FLUSHFAILIGNOREREPLACEREPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

temporary-remote-dir

如果 isUseTemporaryFilename() 为 true,则文件将被写入的临时目录。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

use-temporary-filename

是否写入临时文件并重命名。(布尔类型,默认:true)spring-doc.cadn.net.cn

sftp.consumer.factory
allow-unknown-keys

允许未知或已更改的密钥。 (Boolean,默认:false)spring-doc.cadn.net.cn

cache-sessions

缓存会话。 (布尔值,默认:<none>spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

known-hosts-expression

一个解析为已知主机文件位置的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

pass-phrase

用户私钥的密码。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:22)spring-doc.cadn.net.cn

private-key

用户私钥的位置。 (资源,默认:<none>)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

7.18. TCP接收器

此模块使用编码器通过TCP写入消息。spring-doc.cadn.net.cn

TCP是一个流式协议,需要一些机制来在传输线上封装消息。提供了多种编码器,默认的是'CRLF'。spring-doc.cadn.net.cn

7.18.1. 选项

TCP接收器具有以下选项:tcpspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

tcp.consumer
编码

转换字节为字符串时使用的字符集。(String,默认:UTF-8)spring-doc.cadn.net.cn

关闭

每条消息后是否关闭套接字。(布尔值,默认:false)spring-doc.cadn.net.cn

编码器

发送消息时使用的编码器。(编码,缺省:<none>,可用值:CRLFLFNULLSTXETXRAWL1L2L4)spring-doc.cadn.net.cn

主机

此接收器将要连接的主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

TCP
中文

是否使用 NIO。(布尔值,默认:falsespring-doc.cadn.net.cn

端口

要监听的端口号;0 表示操作系统选择一个端口。(整数,默认:1234)spring-doc.cadn.net.cn

reverse-lookup

对远程IP地址执行反向DNS查找;如果为false,则仅在消息头中包含IP地址。(布尔值,默认:falsespring-doc.cadn.net.cn

socket-timeout

套接字在未收到数据前超时关闭的时间(毫秒)。(Integer,缺省:120000)spring-doc.cadn.net.cn

use-direct-buffers

是否使用直接缓冲区。(布尔值,默认:false)spring-doc.cadn.net.cn

7.18.2. 可用编解码器

文本数据
CRLF(默认)

文本由回车符(0x0d)后跟换行符(0x0a)终止spring-doc.cadn.net.cn

LF

以换行符终止的文本(0x0a)spring-doc.cadn.net.cn

null

以空字节(0x00)终止的文本spring-doc.cadn.net.cn

STXETX

以 STX (0x02) 开头并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据
原始内容

无结构 - 客户端通过关闭套接字来表示一条完整的消息spring-doc.cadn.net.cn

L1

以一个字节(无符号)长度字段开头的数据(支持最多255字节)spring-doc.cadn.net.cn

L2

以两个字节(无符号)长度字段开头的数据(最多为216-1个字节)spring-doc.cadn.net.cn

L4

以四个字节(有符号)长度字段开头的数据(最多可达231-1字节)spring-doc.cadn.net.cn

7.19. 吞吐量接收器

收集器将计算消息数量,并在选定的时间间隔内记录观察到的吞吐量。spring-doc.cadn.net.cn

7.19.1. 选项

通过吞吐量接收器具有以下选项:spring-doc.cadn.net.cn

throughput.report-every-ms

报告频率。(整数,默认:1000spring-doc.cadn.net.cn

7.20. Twitter 消息接收器

从认证用户向指定用户发送直接消息。需要JSON POST请求体,并将Content-Type头部设置为application/jsonspring-doc.cadn.net.cn

当您收到用户的消息时,可以在24小时内回复最多5条消息。每收到一条消息都会重置24小时的时间窗口和5条消息的配额。在24小时内发送第6条消息或在24小时时间窗口外发送消息会被计入速率限制。此行为仅适用于使用POST direct_messages/events/new端点时。

Spring Expression Language (SpEL) 表达式用于从输入消息计算请求参数。spring-doc.cadn.net.cn

7.20.1. 选项

使用单引号 (') 来包裹 SpEL 表达式属性的字面值。 例如,要设置固定的消息文本,请使用 text='Fixed Text'。 对于固定的目标 userId,请使用 userId='666'
twitter.message.update.media-id

与消息关联的媒体ID。直接消息只能引用单个媒体ID。(表达式,默认:<none>)spring-doc.cadn.net.cn

twitter.message.update.screen-name

要发送直聊的用户的屏幕名称。(表达式,默认:<none>)spring-doc.cadn.net.cn

twitter.message.update.text

直接消息文本。如有必要,请进行URL编码。最大长度为10,000个字符。(表达式,默认:payload)spring-doc.cadn.net.cn

twitter.message.update.user-id

要发送直聊的消息的用户 ID。 (表达式,默认:<none>)spring-doc.cadn.net.cn

7.21. 推特更新接收器

更新认证用户的当前文本(例如发布推文)。spring-doc.cadn.net.cn

每次更新尝试时,都会将更新文本与认证用户的近期推文进行比较。任何会导致重复的尝试都将被阻止,并导致返回403错误。用户不能连续两次提交相同的文本。

虽然API本身没有进行速率限制,但用户在一次时间内可以创建的推文数量是有限制的。标准API的更新限制为每3小时窗口内最多300条。如果该用户的更新次数达到当前允许的上限,此方法将返回HTTP 403错误。spring-doc.cadn.net.cn

7.21.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

twitter.update
attachment-url

(SpEL 表达式) 要使 URL 不计入扩展推文的文本正文中,请提供作为推文附件的 URL。该 URL 必须是推文永久链接或直接消息深度链接。任意非 Twitter 的 URL 必须保留在文本中。传递给 attachment_url 参数且不符合任何推文永久链接或直接消息深度链接的 URL 将在创建推文时失败并引发异常。(表达式,默认:<none>)spring-doc.cadn.net.cn

display-coordinates

(SpEL表达式) 是否在发送推文的确切坐标上放置一个标记。 (表达式,默认:<none>)spring-doc.cadn.net.cn

in-reply-to-status-id

(SpEL 表达式) 已存在的文本ID,更新将是对此文本的回复。注意:除非在此参数引用的推文作者在文本中被提及,否则将忽略此参数。因此,您必须包含@username,其中username是所引用推文的作者,在更新中。当设置inReplyToStatusId时,auto_populate_reply_metadata也会自动设置。这确保从原始推文中查找前导@mentions,并将其添加到新推文中。这将在扩展推文的元数据中追加@mentions,直到达到@mentions的限制。如果原始推文已被删除,则回复将会失败。(表达式,默认:<none>)spring-doc.cadn.net.cn

media-ids

(SpEL 表达式) 与推文关联的媒体 ID 列表,用逗号分隔。您可以在一条推文中包含最多 4 张照片或 1 个动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认:<none>)spring-doc.cadn.net.cn

place-id

(SpEL表达式)世界上的一个地方。(表达式,默认:<none>spring-doc.cadn.net.cn

text

(SpEL 表达式) 文本更新的文本。如有需要,请进行 URL 编码。t.co 链接包裹将影响字符计数。默认为消息的有效负载(表达式,默认:payload)spring-doc.cadn.net.cn

twitter.update.location
纬度

此推文引用位置的纬度。除非该参数位于-90.0到+90.0(北为正)范围内,否则将被忽略。如果不存在相应的经度参数,则也会被忽略。(表达式,默认:<none>)spring-doc.cadn.net.cn

此推文所指位置的经度。有效的经度范围为-180.0到+180.0(东为正)。如果超出该范围,不是数字,geo_enabled被禁用,或者没有对应的纬度参数,则忽略该参数。(表达式,默认:<none>)spring-doc.cadn.net.cn

7.22. 波前接收器

Wavefront 接收器消耗 Message<?>,将其转换为波形数据格式中的指标,然后直接发送到 Wavefront 或者 Wavefront 代理。Wavefront 数据格式spring-doc.cadn.net.cn

支持常见的ETL用例,其中现有的(历史)指标数据必须经过清洗、转换并存储在Wavefront中以供进一步分析。spring-doc.cadn.net.cn

7.22.1. 选项

波形图接收器具有以下选项:Wavefrontspring-doc.cadn.net.cn

wavefront.api-token

Wavefront API 访问Tokens。(字符串,默认:<none>spring-doc.cadn.net.cn

wavefront.metric-expression

一个SpEL表达式,用于计算指标值。(表达式,默认:<none>)spring-doc.cadn.net.cn

wavefront.metric-name

指标的名称,默认为应用程序名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

wavefront.proxy-uri

波形代理的URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

wavefront.source

发出指标的唯一应用程序、主机、容器或实例。(字符串,默认:<none>)spring-doc.cadn.net.cn

wavefront.tag-expression

与指标点标记关联的自定义元数据集合不能为空。键的有效字符为字母数字、连字符('-')、下划线('_')、点('.')。对于值,任何字符都允许,包括空格。要包含双引号,请用反斜杠转义,反斜杠不能是标记值中的最后一个字符。点标记键和值组合的最大长度为254个字符(包括分隔键和值的 '=' 号,共255个字符)。如果值较长,则拒绝该点并记录日志(Map<String, Expression>, 默认: <none>)spring-doc.cadn.net.cn

wavefront.timestamp-expression

一个 SpEL 表达式,用于计算指标的时间戳(可选)。(表达式,默认:<none>)spring-doc.cadn.net.cn

wavefront.uri

Wavefront 环境的 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.23. WebSocket接收器

一个简单的 Websocket Sink 实现。spring-doc.cadn.net.cn

7.23.1. 选项

支持以下选项:<br/>spring-doc.cadn.net.cn

websocket.consumer.log-level

netty通道的日志级别。默认为<tt>WARN</tt>(字符串,默认:<none>)spring-doc.cadn.net.cn

websocket.consumer.path

WebSocketSink 消费者需要连接的路径。默认值为 <tt>/websocket</tt> (字符串,默认:/websocket)spring-doc.cadn.net.cn

websocket.consumer.port

Netty服务器监听的端口。默认为9292(整数,默认:9292)spring-doc.cadn.net.cn

websocket.consumer.ssl

是否创建一个 {@link io.netty.handler.ssl.SslContext}。 (Boolean,缺省值: false)spring-doc.cadn.net.cn

websocket.consumer.threads

Netty {@link io.netty.channel.EventLoopGroup} 的线程数。默认是 <tt>1</tt> (Integer,默认:1)spring-doc.cadn.net.cn

7.23.2. 示例

为了验证 websocket-sink 是否从其他 spring-cloud-stream 应用接收消息,您可以使用以下简单的端到端设置。spring-doc.cadn.net.cn

第一步:启动Rabbitmq
步骤2:部署time-source
第3步:部署websocket-sink

最后,以trace模式启动一个websocket接收器,以便在日志中看到由time-source产生的消息:spring-doc.cadn.net.cn

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

你应该开始在启动WebsocketSink的控制台中看到日志消息,如下所示:spring-doc.cadn.net.cn

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.23.3. 执行器

您可以使用Endpoint来访问最后发送和接收的n条消息。您必须通过提供--endpoints.websocketconsumertrace.enabled=true来启用它。默认情况下,它会通过host:port/websocketconsumertrace显示最后100条消息。以下是样本输出:spring-doc.cadn.net.cn

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

还有一个简单的HTML页面,您可以在文本区域中看到转发的消息。您可以直接通过浏览器中的host:port访问它。spring-doc.cadn.net.cn

7.24. XMPP 收集器

"xmpp"接收器可以将消息发送到XMPP服务器。spring-doc.cadn.net.cn

7.24.1. 输入

7.24.2. 输出

7.24.3. 选项

zeromq 接收器有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

xmpp.consumer
chat-to

发送消息到的XMPP处理器。(字符串,默认:<none>)spring-doc.cadn.net.cn

xmpp.factory
主机

要连接的 XMPP 主机服务器。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

连接用户的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

连接到主机的端口。 - 默认客户端端口:5222 (整数,默认:5222)spring-doc.cadn.net.cn

资源

要绑定到 XMPP 主机上的资源。- 可以为空,如果未设置服务器会自动生成一个 (字符串,默认为 <none>)spring-doc.cadn.net.cn

security-mode

<文档缺失> (安全模式,默认值:<none>,可选值:requiredifpossibledisabled)spring-doc.cadn.net.cn

service-name

为 XMPP 域设置的服务名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

subscription-mode

<documentation missing> (订阅模式,默认值: <none>,可能的取值: accept_all,reject_all,manual)spring-doc.cadn.net.cn

user

用户连接时应作为。 (字符串,默认:<none>)spring-doc.cadn.net.cn

7.24.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

7.25. ZeroMQ 汇入

"zeromq"接收器启用向ZeroMQ套接字发送消息。spring-doc.cadn.net.cn

7.25.1. 输入

7.25.2. 输出

7.25.3. 选项

zeromq 接收器有以下选项:
spring-doc.cadn.net.cn

zeromq.consumer.connect-url

连接到 ZeroMQ 套接字的连接 URL。(字符串,默认值:<none>)spring-doc.cadn.net.cn

zeromq.consumer.socket-type

连接应建立的套接字类型。(SocketType,默认:<none>,可能值:PAIRPUBSUBREQREPDEALERROUTERPULLPUSHXPUBXSUBSTREAMCLIENTSERVERRADIODISHCHANNELPEERRAWSCATTERGATHER)spring-doc.cadn.net.cn

zeromq.consumer.topic

发送消息给订阅者之前,用于评估主题的表达式。 (表达式,默认:<none>)spring-doc.cadn.net.cn

7.25.4. Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

7.25.5. 示例

java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=