应用程序
5. 源码
5.1. Debezium 源
Debezium Engine based Change Data Capture (CDC) source.
The Debezium Source allows capturing database change events and streaming those over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.
| 该源代码可以与任何 Spring Cloud Stream 消息绑定器一起使用。 不受或不依赖于 Kafka Connect 框架。虽然这种方法灵活,但也会带来某些限制。 |
所有 Debezium 配置属性都受支持。
只需在任何 Debezium 属性前加上 debezium.properties. 前缀。
例如,要设置 Debezium 的 connector.class 属性,请使用 debezium.properties.connector.class 源属性代替。
5.1.1. 数据库支持
The Debezium Source currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.
5.1.2. 选项
事件平铺配置
Debezium 提供了一种全面的消息格式,可以准确地详细说明系统中发生的变更信息。
有时候这种格式可能并不适合下游消费者,这些消费者可能需要消息格式简化,使得字段名称和值以 flattened 结构呈现。
为简化Debezium 连接器产生的事件记录的格式,您可以使用 Debezium 事件扁平化 消息转换。 使用 扁平化配置,您可以配置简单的消息格式,例如:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 偏移量存储
When a Debezium source runs, it reads information from the source and periodically records offsets that define how much of that information it has processed.
Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading.
Out of the box, the following offset storage configuration options are provided:
-
In-Memory
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore -
本地文件系统
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)1 用于存储偏移量的文件路径。当 offset.storage`设置为FileOffsetBackingStore时必需。2 间隔多久尝试提交偏移量。默认是1分钟。 -
Kafka 主题
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)1 用于存储偏移量的Kafka主题的名称。当 offset.storage被设置为KafkaOffsetBackingStore时必需。2 在创建偏移存储主题时使用的分区数量。 3 复制因子用于在创建偏移量存储主题时使用。 4 间隔多久尝试提交偏移量。默认是1分钟。
一个可以实现org.apache.kafka.connect.storage.OffsetBackingStore接口以提供与自定义后端键值存储绑定的偏移存储。
连接器属性
下面的表格列出了每个 connecter 可用的所有 Debezium 属性。
这些属性可以通过在前面加上 debezium.properties. 前缀来使用。
5.1.3. 示例和测试
Debezium 集成测试使用本地机器上的数据库固定数据。借助 Testcontainers 使用预构建的Debezium Docker 数据库镜像。
要在您的 IDE 中运行和调试测试,需要从命令行部署所需的数据库映像。下面的说明解释了如何从 Docker 映像运行预配置的测试数据库。
MySQL
在 docker 中启动 debezium/example-mysql:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选) 使用 mysql 客户端连接到数据库并创建具有所需凭据的 debezium 个用户: |
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
使用以下属性连接 Debezium 源到 MySQL 数据库:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
| 1 | 配置 Debezium 源使用 MySqlConnector。 |
| 2 | 用于识别和分派传入事件的元数据。 |
| 3 | 连接到运行在localhost:3306上的MySQL服务器,作为debezium用户。 |
| 4 | 包含在 Change Event Value 模式中的 ChangeEvent 消息。 |
| 5 | 启用 事件扁平化。 |
| 6 | 源状态在多次启动之间进行保留。 |
您也可以使用此mysql配置运行DebeziumDatabasesIntegrationTest#mysql()。
| 禁用 mysql GenericContainer 测试初始化代码。 |
PostgreSQL
从debezium/example-postgres:1.0 Docker镜像启动一个预配置的postgres服务器:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性连接 Debezium 源到 PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
| 1 | 配置 Debezium Source 以使用 PostgresConnector。 |
| 2 | 配置 Debezium 发引用来使用 memory 存储。 |
| 3 | 用于识别和分派传入事件的元数据。 |
| 4 | 连接到运行在localhost:5432上的PostgreSQL服务器,作为postgres用户。 |
| 5 | 包含在消息中的Change Event Value模式 |
| 6 | 启用 事件扁平化。 |
你也可以使用此 postgres 配置运行 DebeziumDatabasesIntegrationTest#postgres()。
| 禁用 postgres 通用容器测试初始化代码。 |
MongoDB
从 debezium/example-mongodb:2.3.3.Final 容器镜像启动一个预配置的 mongodb:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
初始化库存集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在 mongodb 终端输出中,搜索类似于 host: "3f95a8a6516e:27017" 的日志条目:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
在你的/etc/hosts中添加127.0.0.1 3f95a8a6516e条目
使用以下属性连接 Debezium 源到 MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
| 1 | 配置 Debezium Source 以使用 MongoDB Connector。 |
| 2 | 配置 Debezium 发行引擎使用 memory。 |
| 3 | 连接到运行在localhost:27017上的MongoDB,作为debezium用户。 |
| 4 | debezium.io/docs/connectors/mongodb/#tasks |
| 5 | 在SourceRecord事件中包含更改事件值模式。 |
| 6 | Enables the Chnage Event Flattening. |
你也可以使用此mongodb配置运行DebeziumDatabasesIntegrationTest#mongodb()。
SQL Server
从 debezium/example-postgres:1.0 Docker 镜像启动一个 sqlserver:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
包含来自 debezium SqlServer 教程的示例数据 Populate:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性连接 Debezium 源到 SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
| 1 | 配置 Debezium Source 以使用 SqlServerConnector。 |
| 2 | 配置 Debezium 发引用来使用 memory 状态存储。 |
| 3 | 用于识别和分派传入事件的元数据。 |
| 4 | 连接到在localhost:1433上运行的SQL Server,作为sa用户。 |
你可以使用这种 SQL Server 配置来运行 also 的 0。
Oracle
启动Oracle并从localhost访问,按照Debezium Vagrant设置中描述的配置、用户和授权进行设置
包含来自 Debezium Oracle 示例教程的示例数据的表单:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询一个目录,并将新文件或其内容发送到输出通道。<br/>默认情况下,文件源会提供文件的内容作为字节数组。<br/>但是,可以使用 --file.supplier.mode 选项进行自定义:
-
ref 提供一个 java.io.File 引用
-
按行分割文件,并为每一行发出一条新消息
-
内容 默认。提供文件的内容作为字节数组
在使用 --file.supplier.mode=lines 时,还可以提供附加选项 --file.supplier.withMarkers=true。如果将其设置为 true,则底层的 FileSplitter 在实际数据之前和之后会发出额外的文件开始和结束标记消息。这两个额外标记消息的有效载荷类型为 FileSplitter.FileMarker。如果未明确设置,则 withMarkers 选项默认为 false。
5.3. FTP源
此源应用程序支持使用FTP协议传输文件。
文件从remote目录传输到部署应用的local目录中。
默认情况下,源发出的消息以字节数组形式提供。但是,可以使用--mode选项进行自定义:
-
ref 提供一个
java.io.File引用 -
lines 将按行拆分文件,并为每一行发出新消息
-
contents 默认值。提供文件的内容作为字节数组
使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始 和 文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarker。withMarkers 的默认值是 false,除非明确设置。
另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
5.3.1. 输入
N/A(从FTP服务器获取文件)。
5.3.2. 输出
模式 = 内容
标题:
-
Content-Type: application/octet-stream -
file_originalFile: <java.io.File> -
file_name: <file name>
负载:
用文件内容填充的 byte[]。
模式 = 行
标题:
-
Content-Type: text/plain -
file_orginalFile: <java.io.File> -
file_name: <file name> -
correlationId: <UUID>(每行相同) -
sequenceNumber: <n> -
sequenceSize: 0(在文件读取之前,行数未知)
负载:
每行一个String。
第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
标记的存在和格式由with-markers和markers-json属性确定。
模式 = 参考
标题:
None.
负载:
一个 java.io.File 对象。
5.3.4. 示例
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. HTTP 源
一个监听HTTP请求并将其主体作为消息负载发出的应用程序。
如果Content-Type匹配text/*或application/json,负载将是一个字符串,
否则负载将是一个字节数组。
5.4.1. 有效载荷:
If content type matches text/* 或 application/json
-
String
如果内容类型不匹配 text/* 或 application/json
-
byte array
5.5. JDBC 源
此源从关系型数据库管理系统(RDBMS)中轮询数据。
此源完全基于 DataSourceAutoConfiguration,有关更多信息,请参阅 Spring Boot JDBC 支持。
5.5.1. 有效载荷
-
Map<String, Object>当jdbc.split == true(默认)且List<Map<String, Object>>其他情况时
5.5.2. 选项
也请参见 Spring Boot 文档
对于额外的 DataSource 属性和 TriggerProperties 以及 MaxMessagesProperties 的轮询选项。
5.7. Apache Kafka Source
此模块从Apache Kafka消费消息。
5.7.1. 选项
The kafka source has the following options:
(参见 Spring for Apache Kafka 的 Spring Boot 配置属性文档)
5.8. 加载生成器源码
一个产生生成数据并将其发送到流的源。
5.8.1. 选项
The load-generator 源代码具有以下选项:
- load-generator.generate-timestamp
-
是否生成时间戳。 (布尔值, 默认:
false) - load-generator.message-count
-
消息数量. (Integer, 默认值:
1000) - load-generator.message-size
-
消息大小。 (整数, 默认值:
1000) - load-generator.producers
-
生产者数量。 (整数, 默认值:
1)
5.10. MongoDB 数据源
此源从MongoDB获取数据。
此源完全基于MongoDataAutoConfiguration,请参阅
Spring Boot MongoDB支持
以获取更多信息。
5.10.1. 选项
The mongodb source 有以下选项:
也请参见 Spring Boot 文档 以了解额外的 MongoProperties 属性。
查看并 TriggerProperties 了解轮询选项。
5.12. RabbitMQ 源
The "rabbit" 源启用接收来自RabbitMQ的消息。
The queue(s) 必须在流部署之前存在;它们不会自动创建。 您可以轻松地使用 RabbitMQ 网页界面创建一个队列。
5.12.1. 输入
N/A
5.12.2. 输出
负载
-
byte[]
5.12.3. 选项
Rabbit 源代码有以下选项:
也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。
一个关于重试的注意事项
在默认的 ackMode (AUTO)和 requeue (true)选项下,失败的消息投递将无限次重试。由于Rabbit源本身处理的内容不多,因此源本身出现故障的风险较小,除非下游的Binder因某种原因未连接。将 重试 设置为 false 将导致消息在首次尝试时被拒绝(如果代理配置了死信交换/队列,则可能发送到该位置)。enableRetry 选项允许配置重试参数,以便在消息传递失败时进行重试,并在重试次数用尽后最终丢弃(或死信)。在重试间隔期间,交付线程被挂起。重试选项包括enableRetry、maxAttempts、initialRetryInterval、retryMultiplier和maxRetryInterval。使用 MessageConversionException 导致的消息传递失败永远不会重试;假设如果消息在第一次尝试时无法转换,后续尝试也会失败。此类消息将被丢弃(或转入死信队列)。
|
5.12.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:
$ ./mvnw clean package
5.12.5. 示例
java -jar rabbit-source.jar --rabbit.queues=
5.13. 亚马逊S3源
此源应用程序支持使用 Amazon S3 协议传输文件。 文件从 remote 目录(S3 存储桶)传输到部署应用程序的 local 目录。
源发出的消息默认以字节数组提供。但是,可以使用--mode选项进行自定义:
-
ref 提供一个
java.io.File引用 -
lines 将按行拆分文件,并为每一行发出新消息
-
contents 默认值。提供文件的内容作为字节数组
使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始 和 文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarker。withMarkers 的默认值是 false,除非明确设置。
另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
5.13.1. mode = lines
标题:
-
Content-Type: text/plain -
file_orginalFile: <java.io.File> -
file_name: <file name> -
correlationId: <UUID>(每行相同) -
sequenceNumber: <n> -
sequenceSize: 0(在文件读取之前,行数未知)
负载:
每行一个String。
第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
标记的存在和格式由with-markers和markers-json属性确定。
5.13.2. 模式 = ref
标题:
None.
负载:
一个 java.io.File 对象。
5.13.4. Amazon AWS 常用选项
Amazon S3 源(与其他所有 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,并且其自动配置类由 Spring Boot 自动使用。有关所需和有用的自动配置属性,请参阅他们的文档。
5.13.5. 示例
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP 源
此源应用程序支持使用SFTP协议传输文件。文件从remote目录传输到部署应用的local目录。默认情况下,源发出的消息为字节数组。但是,可以使用--mode选项进行自定义:
-
ref 提供一个
java.io.File引用 -
lines 将按行拆分文件,并为每一行发出新消息
-
contents 默认值。提供文件的内容作为字节数组
使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始 和 文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarker。withMarkers 的默认值是 false,除非明确设置。
有关高级配置选项,请参阅sftp-supplier。
另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
5.14.1. 输入
N/A(从SFTP服务器获取文件)。
5.14.2. 输出
模式 = 内容
标题:
-
Content-Type: application/octet-stream -
file_name: <file name> -
file_remoteFileInfo <file metadata> -
file_remoteHostPort: <host:port> -
file_remoteDirectory: <relative-path> -
file_remoteFile: <file-name> -
sftp_selectedServer: <server-key>(如果多源)
负载:
用文件内容填充的 byte[]。
模式 = 行
标题:
-
Content-Type: text/plain -
file_name: <file name> -
correlationId: <UUID>(每行相同) -
sequenceNumber: <n> -
sequenceSize: 0(在文件读取之前,行数未知) -
file_marker : <file marker>(如果启用了带标记器)
负载:
每行一个String。
第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
标记的存在和格式由with-markers和markers-json属性确定。
模式 = 参考
标题:
-
file_remoteHostPort: <host:port> -
file_remoteDirectory: <relative-path> -
file_remoteFile: <file-name> -
file_originalFile: <absolute-path-of-local-file> -
file_name <local-file-name> -
file_relativePath -
file_remoteFile: <remote-file-name> -
sftp_selectedServer: <server-key>(如果多源)
负载:
一个 java.io.File 对象。
5.14.4. 示例
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.16. TCP
源代码tcp用作服务器,允许远程方连接到它并通过原始tcp套接字提交数据。
TCP是一种流式协议,需要某种机制来在传输线上对消息进行定界。提供了多种解码器,默认的是'CRLF',与Telnet兼容。
由TCP源应用程序生成的消息的byte[]有效负载。
5.16.1. 选项
5.16.2. 可用解码器
- CRLF(默认)
-
文本由回车符(0x0d)后跟换行符(0x0a)终止
- LF
-
以换行符终止的文本(0x0a)
- null
-
以空字节(0x00)终止的文本
- STXETX
-
以 STX (0x02) 开头并以 ETX (0x03) 结尾的文本
- 原始内容
-
无结构 - 客户端通过关闭套接字来表示一条完整的消息
- L1
-
以一个字节(无符号)长度字段开头的数据(支持最多255字节)
- L2
-
以两个字节(无符号)长度字段开头的数据(最多为216-1个字节)
- L4
-
以四个字节(有符号)长度字段开头的数据(最多可达231-1字节)
5.17. 时间源
时间源将每隔一段时间简单地发出包含当前时间的字符串。
5.17.1. 可选项
时间源具有以下选项:time
spring.integration.poller
- 定时任务
-
Cron 表达式用于轮询。与 'fixedDelay' 和 'fixedRate' 相互排斥。(字符串,默认:
<none>) - fixed-delay
-
轮询延迟周期。与 'cron' 和 'fixedRate' 相互排斥。(持续时间,默认:
<none>) - fixed-rate
-
轮询速率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:
<none>) - initial-delay
-
轮询初始延迟。对 'fixedDelay' 和 'fixedRate' 生效;对于 'cron' 被忽略。(持续时间,默认:
<none>) - max-messages-per-poll
-
轮询周期内每次轮询要获取的最大消息数量。(整数,默认:
<none>) - receive-timeout
-
轮询时等待消息的时间。(持续时间,默认:
1s)
5.18. Twitter 消息源
重复获取最近30天内的直接消息(包括已发送和接收的消息),并按时间倒序排列。
缓解后的消息被缓存(在MetadataStore缓存中)以防止重复。
默认使用内存中的SimpleMetadataStore。
该twitter.message.source.count控制返回消息的数量。
属性spring.cloud.stream.poller控制 the message poll interval。
必须与使用的API速率限制一致
5.18.1. 选项
5.19. Twitter搜索源
Twitter 的 标准搜索 API(search/tweets)允许对最近或热门推文的索引进行简单查询。这个 Source 提供了针对过去 7 天内发布的新近推文抽样的连续搜索。是 'public' API 集合的一部分。
返回与指定查询匹配的相关推文集合。
使用spring.cloud.stream.poller属性来控制连续搜索请求之间的间隔。速率限制-每30分钟窗口内180个请求(例如,约6次/分钟,约1次/10秒)
该twitter.search查询属性允许通过关键字进行查询,并根据时间和地理位置过滤结果。
代码twitter.search.count和twitter.search.page控制搜索API的结果分页。
注意:Twitter 的搜索服务以及由此扩展的搜索 API 并不打算作为推文的全面来源。并非所有的推文都会被索引或通过搜索界面提供。
5.19.1. 选项
5.20. Twitter 流源
-
The
Filter API返回与一个或多个过滤器谓词匹配的公共状态。 使用多个参数可以使用单个连接到流式API。 提示:track、follow和locations字段用或运算符组合! 包含track=foo和follow=1234的查询返回匹配test或由用户1234创建的推文。 -
该
Sample API返回所有公共状态的小随机样本。
默认访问级别的推文返回的结果相同,因此如果两个不同的客户端连接到此端点,则它们会看到相同的推文。
默认访问级别允许最多400个跟踪关键字、5,000个关注用户ID以及25个0.1-360度的位置框。
5.20.1. 可选项
5.21. WebSocket 源
通过 WebSocket 生产消息的 Websocket 源。
5.21.1. 选项
5.21.2. 示例
要验证 websocket-source 是否从 WebSocket 客户端接收消息,可以使用以下简单的端到端设置。
步骤1:启动kafka
步骤2:将websocket-source部署到特定端口,例如8080
第3步:在端口8080路径“/websocket”上连接一个WebSocket客户端,并发送一些消息。
您可以启动一个kafka控制台消费者并在那里查看消息。
5.22. XMPP 源
"xmpp"源可用来从XMPP服务器接收消息。
5.22.1. 输入
N/A
5.22.2. 输出
负载
-
byte[]
5.22.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:
$ ./mvnw clean package
5.22.5. 示例
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ 源
“zeromq”源启用从ZeroMQ接收消息。
5.23.1. 输入
N/A
5.23.2. 输出
负载
-
byte[]
5.23.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:
$ ./mvnw clean package
5.23.5. 示例
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. 处理器
6.1. 聚合处理器
聚合处理器使应用程序能够将传入的消息分组并释放到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。
6.1.1. 负载
如果输入负载为 byte[] 并且内容类型标题为 JSON,则 JsonBytesToMap 函数尝试反序列化此负载以获得更好的数据表示形式,以便在聚合器函数的输出中使用。此外,这种 Map 数据表示方式使得从下面提到的 SpEL 表达式轻松访问负载内容成为可能。否则(包括反序列化错误),输入负载保持不变——目标应用程序配置将其转换为所需的形式。
6.1.2. 选项
6.3 过滤器处理器
过滤器处理器允许应用程序检查传入的有效载荷,然后对其应用谓词以决定是否需要继续记录。 例如,如果传入的有效载荷类型为 String 并且您想筛选掉少于五个字符的内容,则可以按如下方式运行过滤器处理器。
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。
6.3.1. 负载
您可以传递任何类型的负载,然后对其应用SpEL表达式进行过滤。如果传入的类型是byte[]且内容类型设置为text/plain或application/json,则应用程序会将byte[]转换为String。
6.3.2. 选项
6.4. Groovy 处理器
一个处理器,用于在消息上应用 Groovy 脚本。
6.4.1. 选项
该 groovy-processor 处理器具有以下选项:
- groovy-processor.script
-
指向用于处理消息的脚本的引用。(资源,缺省:
<none>) - groovy-processor.variables
-
变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:
<none>) - groovy-processor.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:
<none>)
6.5. Header Enricher Processor
使用 header-enricher 应用添加消息头。
标题以换行符分隔的键值对形式提供,其中键是标题名称,值为SpEL表达式。
例如 --headers='foo=payload.someProperty \n bar=payload.otherProperty'。
6.6. Http 请求处理器
一个处理器应用程序,该程序向HTTP资源发出请求,并将响应正文作为消息有效负载进行发布。
6.6.1. 输入
headers
任何必需的 HTTP 头部必须通过 headers 或 headers-expression 属性显式设置。请参见下面的示例。
头部值也可用于构造:
-
当在
body-expression属性中引用时,请求正文。 -
当在
http-method-expression属性中引用时,HTTP方法。 -
在
url-expression属性中引用时的URL。
负载
默认情况下,负载用作 POST 请求的请求体,并且可以是任何 Java 类型。对于 GET 请求,它应该是一个空字符串。负载还可以用于构建:
-
当在
body-expression属性中引用时,请求正文。 -
当在
http-method-expression属性中引用时,HTTP方法。 -
在
url-expression属性中引用时的URL。
底层的WebClient支持Jackson JSON序列化,以支持任何请求和响应类型。expected-response-type属性,默认为String.class,可以设置为您应用程序类路径中的任何类。请注意,用户定义的有效负载类型需要向您的pom文件添加所需的依赖项。
6.6.2. 输出
headers
没有HTTP消息头被映射到出站Message。
负载
原始输出对象是ResponseEntity<?>,其任何字段(例如:body、headers)或访问器方法(statusCode)都可以作为reply-expression的一部分引用。默认情况下,传出消息的有效负载是响应体。请注意,Jackson 默认无法反序列化 ResponseEntity(由表达式 #root 引用),但可以将其渲染为HashMap。
6.6.3. 选项
The http-request processor has the following options:
未解决的指令在处理器.adoc中 - include::/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/image-recognition-processor/README.adoc[tags=ref-doc]
processors.adoc 中未解析的指令:include::/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/object-detection-processor/README.adoc[tags=ref-doc]
processors.adoc 中未解析的指令(include::/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc])
6.7 脚本处理器
使用脚本转换消息的处理器。脚本正文作为属性值直接提供。可以指定脚本语言(groovy/javascript/ruby/python)。
6.7.1. 选项
脚本处理器有以下选项:script-processor
- script-processor.language
-
脚本属性中文本的语言。支持的类型为:groovy、javascript、ruby、python。(字符串,默认值:
<none>) - script-processor.script
-
脚本文本。(字符串,默认:
<none>) - script-processor.variables
-
变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:
<none>) - script-processor.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:
<none>)
6.8. 拆分器处理器
拆分器应用程序建立在Spring Integration中的同名概念之上,允许将单个消息拆分为多个不同的消息。
处理器使用一个函数作为输入Message<?>,然后根据各种属性产生输出List<Message<?>(见下文)。
您可以使用SpEL表达式或定界符来指定如何拆分传入的消息。
6.8.1. 负载
-
传入的有效负载 -
Message<?>
如果传入的类型是 byte[] 并且内容类型设置为 text/plain 或 application/json,则应用程序会将 byte[] 转换为 String。
-
传出负载 -
List<Message<?>
6.8.2. 选项
6.9 转换处理器
转换器处理器允许您基于SpEL表达式来转换消息负载结构。
以下是运行此应用程序的示例。
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。
6.9.1. 负载
传入的消息可以包含任何类型的负载。
6.9.2. 选项
6.10. Twitter热搜及地点处理器
能够返回热门话题或热门话题位置的处理器。twitter.trend.trend-query-type属性允许选择查询类型。
6.10.1. 获取特定位置的趋势话题(可选)
在此模式下,将twitter.trend.trend-query-type设置为trend。
6.10.2. 获取趋势位置
在此模式下,将twitter.trend.trend-query-type设置为trendLocation。
通过位置检索热门话题的完整列表或附近的位置列表。
如果未提供latitude、longitude参数,则处理器执行可用趋势API,并返回Twitter具有话题趋势信息的位置。
如果提供了latitude、longitude参数,处理器会执行趋势最近API并返回Twitter具有趋势话题信息的位置,这些位置最接近指定的位置。
响应是一个 locations 数组,用于编码位置的 WOEID 和其他人类可读信息,例如规范名称和该位置所属国家。
6.10.3. 选项
7. 消息通道
7.1. Cassandra Sink(Cassandra 接收器)
此接收应用程序将接收到的每个消息的内容写入Cassandra。
它期望一个JSON字符串作为负载,并使用其属性来映射到表列。
7.1.1. 负载
表示要持久化的实体(或实体列表)的 JSON 字符串或字节数组。
7.2. 分析接收器
基于分析消费者构建的接收应用程序,该程序从输入消息中计算分析,并将分析结果作为指标发布到各种监控系统。它利用micrometer库在最流行的监控系统上提供统一的编程体验,并公开Spring表达式语言(SpEL)属性,用于定义如何从输入数据中计算度量名称、值和标签。
分析接收器可以生成两种指标类型:
一个 计量器(例如计数器或仪表)由其唯一的 name 和 dimensions 确定(术语维度和标签可以互换使用)。维度允许对特定命名指标进行切片,以便深入分析和推理数据。
由于指标由其name和dimensions唯一标识,因此您可以为每个指标分配多个标签(例如键/值对),但之后不能随意更改这些标签!如果具有相同名称的指标具有不同的标签集,则监控系统(如Prometheus)会发出警告。 |
使用analytics.name或analytics.name-expression属性设置输出分析指标的名称。如果没有设置,指标名称默认为应用程序的名称。
使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>属性可以向您的指标添加一个或多个标签。TAG_NAME在属性定义中将作为标签名称出现在指标中。TAG_VALUE是SpEL表达式,该表达式从传入的消息动态计算出标签值。
表达式使用SpEL和headers关键字来访问消息的头和有效载荷值。
您可以使用字面量(例如 'fixed value')来设置具有固定值的标签。 |
所有流应用程序开箱即用地支持三种最流行的监控系统,Wavefront、Prometheus 和 InfluxDB,您可以通过声明方式启用每种系统。通过只需向 Analytics Sink 应用程序添加它们的 Micrometer Meter-Registry 依赖项即可添加对其他监控系统的支持。
请访问 Spring Cloud Data Flow 流监控 获取配置监控系统的详细说明。以下快速片段可帮助您开始。
-
要启用 Prometheus 计量注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用Wavefront计数器注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 计量注册表,请设置以下属性。
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink将重用提供的度量配置。 |
下图说明了Analytics Sink如何帮助收集股票交易所和实时数据管道的业务内部信息。
7.2.1. 负载
传入的消息可以包含任何类型的负载。
7.2.2. 选项
7.3. Elasticsearch 写入器
将文档索引到Elasticsearch中的接收器。<br>
此Elasticsearch接收器仅支持索引JSON文档。
它从输入目标获取数据,然后将其索引到Elasticsearch中。
输入数据可以是普通的json字符串,或者表示JSON的java.util.Map。
它还接受作为Elasticsearch提供的XContentBuilder的数据。
然而,在中间件不将记录保存为XContentBuilder的情况下,这种情况很少见。
这主要是为了直接调用消费者而提供的。
7.3.2. 运行此接收器的示例
-
从文件夹
elasticsearch-sink:./mvnw clean package -
cd apps
-
切换到正确的绑定器生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package -
请确保您已运行Elasticsearch。例如,您可以使用以下命令将其作为docker容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2 -
如果中间件(Kafka 或 RabbitMQ)尚未运行,请先启动它。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing -
将一些JSON数据发送到中间件目的地。例如:
{"foo":"bar"} -
验证数据是否已索引:
curl localhost:9200/testing/_search
7.5. FTP 接收器
FTP接收器是从传入的消息推送到FTP服务器的简单选项。
它使用ftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。
要使用此接收器,您需要一个用户名和密码来登录。
默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会使用该文件的原始名称。 |
7.5.1. 头部
-
file_name(见上文注释)
7.5.2. 负载
-
java.io.File -
java.io.InputStream -
byte[] -
String
7.5.3. 输出
不适用(写入FTP服务器)。
7.6. JDBC Sink
JDBC接收器允许您将传入的有效负载持久保存到关系型数据库管理系统(RDBMS)数据库中。
jdbc.consumer.columns 属性表示 COLUMN_NAME[:EXPRESSION_FOR_VALUE] 的键值对,其中 EXPRESSION_FOR_VALUE(连同冒号)是可选的。 在这种情况下,该值通过生成的表达式(如 payload.COLUMN_NAME)进行评估,因此我们可以通过这种方式实现从对象属性到表列的直接映射。 例如,我们有一个如下所示的 JSON 负载:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以使用以下配置将数据插入到具有 name、city 和 street 结构的表中:
--jdbc.consumer.columns=name,city:address.city,street:address.street
此接收器支持批量插入,具体取决于底层的 JDBC 驱动程序。 通过 batch-size 和 idle-timeout 属性配置批量插入: 当接收到的消息达到 batch-size 条时进行聚合,并作为一批次插入。 如果在没有新消息的情况下经过了 idle-timeout 毫秒,则即使批次大小小于 batch-size,也会插入已聚合的数据,以此来限制最大延迟。
该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。 |
7.6.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. Payload
7.7. Apache Kafka 写入器
此模块向Apache Kafka发布消息。
7.7.1. 选项
<strong>kafka</strong>接收器有以下选项:
(参见 Spring for Apache Kafka 的 Spring Boot 配置属性文档)
7.8. 日志接收器
该 log 接收器使用应用程序记录器输出数据以供检查。
请理解,log 接收器使用无类型的处理器,这会影响实际的日志记录方式。这意味着如果内容类型为文本,则原始负载字节将被转换为字符串;否则将记录原始字节。用户指南中包含更多信息。
7.9. MongoDB Sink
此接收应用程序将传入数据存入MongoDB。
该应用程序完全基于MongoDataAutoConfiguration,因此请参阅Spring Boot MongoDB支持获取更多信息。
7.9.1. 输入
负载
-
任何 POJO
-
String -
byte[]
7.11. Pgcopy 填充器
一个模块,使用PostgreSQL COPY命令将传入的有效负载写入RDBMS。
7.11.1. 输入
headers
负载
-
任何
列表达式将根据消息进行评估,该表达式通常仅与一种类型(如 Map 或 bean 等)兼容。
7.11.2. 输出
N/A
7.11.3. 选项
jdbc接收器有以下选项:jdbc
该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。 |
7.11.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹基础项目。 然后,你可以进入其中一个文件夹并进行构建:
$ ./mvnw clean package
要运行集成测试,请在本地主机上启动一个 PostgreSQL 数据库:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. 示例
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.14. 路由器接收器
此应用程序将消息路由到命名通道。
7.14.1. 选项
路由器接收器具有以下选项:router
- router.default-output-binding
-
无法路由的消息发送到何处。(字符串,默认:
<none>) - router.destination-mappings
-
目的地映射作为以换行符分隔的名称-值对字符串,例如 'foo=bar\ baz=car'。 (属性,默认:
<none>) - router.expression
-
应用于消息以确定路由到的通道的表达式。请注意,对于文本、JSON 或 XML 等内容类型,有效负载的字节格式为 byte[] 而不是 String!请查阅文档了解如何处理字节数组有效负载内容。(表达式,默认:
<none>) - router.refresh-delay
-
检查脚本更改的时间间隔(毫秒);如果为负数,则不刷新。(Integer,默认值:
60000) - router.resolution-required
-
是否需要通道解析。(布尔类型,默认:
false) - router.script
-
返回通道或通道映射解析键的 Groovy 脚本的位置。(资源,默认:
<none>) - router.variables
-
变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:
<none>) - router.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:
<none>)
此路由器接收器基于 Spring Cloud Stream 的 StreamBridge API,因此可以根据需要创建目标。在这种情况下,如果键未包含在
|
您可以使用 spring.cloud.stream.dynamicDestinations 属性限制动态绑定的创建。默认情况下,所有解析的目标都会被动态绑定;如果此属性包含一个以逗号分隔的目标名称列表,则只有这些目标会被绑定。解析到不在该列表中的目的地的消息将路由到 defaultOutputBinding,它也必须出现在该列表中。
代码destinationMappings用于将评估结果映射到实际的目标名称。
7.14.2. 基于 SpEL 的路由
该表达式针对消息进行求值,并返回一个频道名称,或者是一个频道名称映射表的键。
有关更多信息,请参阅《Spring Integration参考手册》中的“路由器与Spring表达式语言(SpEL)”小节。配置通用路由器部分。
7.14.3. 基于Groovy的路由
除了使用 SpEL 表达式之外,还可以使用 Groovy 脚本。让我们在文件系统中创建一个 Groovy 脚本,位于 "file:/my/path/router.groovy" 或者 "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果您希望向脚本传递变量值,可以使用variables选项静态绑定值,或者选择性地通过propertiesLocation选项传入包含绑定信息的属性文件路径。文件中的所有属性都将作为变量提供给脚本。您可以同时指定variables和propertiesLocation,在这种情况下,以variables提供的任何重复值都会覆盖在propertiesLocation中提供的值。
请注意payload和headers是隐式绑定的,以便您能够访问消息中包含的数据。
有关更多信息,请参阅Spring Integration参考手册Groovy支持。
7.16. Amazon S3 接收器
此接收应用程序支持将对象传输到Amazon S3存储桶。
文件负载(以及递归目录)被传输到remote目录(S3存储桶),然后传输到应用程序部署所在的local目录。
此接收器接受的消息必须包含以下之一:payload
-
File,包括用于递归上传的目录; -
InputStream; -
byte[]
7.16.1. 选项
该s3接收器具有以下选项:
基于AmazonS3SinkConfiguration生成的目标应用程序可以使用S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener进行增强,这两个组件被注入到S3MessageHandler Bean中。有关详细信息,请参阅Spring集成AWS支持。
7.16.2. 亚马逊AWS通用选项
Amazon S3 连接器(如同所有其他 Amazon AWS 应用程序)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。请查阅相关文档了解所需和有用的自动配置属性。
其中一些是关于AWS凭据的:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profilePath
其他用于 AWS Region 定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
示例
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP接收器
SFTP接收器是从传入消息向SFTP服务器推送文件的简单选项。
它使用sftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。
要使用此接收器,您需要一个用户名和密码来登录。
默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会使用该文件的原始名称。 |
配置 sftp.factory.known-hosts-expression 选项时,评估的根对象是应用程序上下文,例如sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'。
7.17.1. 输入
headers
-
file_name(见上文注释)
负载
-
java.io.File -
java.io.InputStream -
byte[] -
String
7.17.2. 输出
N/A (写入到 SFTP 服务器)。
7.18. TCP接收器
此模块使用编码器通过TCP写入消息。
TCP是一个流式协议,需要一些机制来在传输线上封装消息。提供了多种编码器,默认的是'CRLF'。
7.18.2. 可用编解码器
- CRLF(默认)
-
文本由回车符(0x0d)后跟换行符(0x0a)终止
- LF
-
以换行符终止的文本(0x0a)
- null
-
以空字节(0x00)终止的文本
- STXETX
-
以 STX (0x02) 开头并以 ETX (0x03) 结尾的文本
- 原始内容
-
无结构 - 客户端通过关闭套接字来表示一条完整的消息
- L1
-
以一个字节(无符号)长度字段开头的数据(支持最多255字节)
- L2
-
以两个字节(无符号)长度字段开头的数据(最多为216-1个字节)
- L4
-
以四个字节(有符号)长度字段开头的数据(最多可达231-1字节)
7.19. 吞吐量接收器
收集器将计算消息数量,并在选定的时间间隔内记录观察到的吞吐量。
7.19.1. 选项
通过吞吐量接收器具有以下选项:
- throughput.report-every-ms
-
报告频率。(整数,默认:
1000)
7.20. Twitter 消息接收器
从认证用户向指定用户发送直接消息。需要JSON POST请求体,并将Content-Type头部设置为application/json。
| 当您收到用户的消息时,可以在24小时内回复最多5条消息。每收到一条消息都会重置24小时的时间窗口和5条消息的配额。在24小时内发送第6条消息或在24小时时间窗口外发送消息会被计入速率限制。此行为仅适用于使用POST direct_messages/events/new端点时。 |
Spring Expression Language (SpEL) 表达式用于从输入消息计算请求参数。
7.20.1. 选项
使用单引号 (') 来包裹 SpEL 表达式属性的字面值。 例如,要设置固定的消息文本,请使用 text='Fixed Text'。 对于固定的目标 userId,请使用 userId='666'。 |
7.21. 推特更新接收器
更新认证用户的当前文本(例如发布推文)。
| 每次更新尝试时,都会将更新文本与认证用户的近期推文进行比较。任何会导致重复的尝试都将被阻止,并导致返回403错误。用户不能连续两次提交相同的文本。 |
虽然API本身没有进行速率限制,但用户在一次时间内可以创建的推文数量是有限制的。标准API的更新限制为每3小时窗口内最多300条。如果该用户的更新次数达到当前允许的上限,此方法将返回HTTP 403错误。
您可以在此处找到更新API的详细信息:developer.twitter.com/zh-CN/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 选项
7.22. 波前接收器
Wavefront 接收器消耗 Message<?>,将其转换为波形数据格式中的指标,然后直接发送到 Wavefront 或者 Wavefront 代理。Wavefront 数据格式
支持常见的ETL用例,其中现有的(历史)指标数据必须经过清洗、转换并存储在Wavefront中以供进一步分析。
7.23. WebSocket接收器
一个简单的 Websocket Sink 实现。
7.23.2. 示例
为了验证 websocket-sink 是否从其他 spring-cloud-stream 应用接收消息,您可以使用以下简单的端到端设置。
第一步:启动Rabbitmq
步骤2:部署time-source
第3步:部署websocket-sink
最后,以trace模式启动一个websocket接收器,以便在日志中看到由time-source产生的消息:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
你应该开始在启动WebsocketSink的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
您可以使用Endpoint来访问最后发送和接收的n条消息。您必须通过提供--endpoints.websocketconsumertrace.enabled=true来启用它。默认情况下,它会通过host:port/websocketconsumertrace显示最后100条消息。以下是样本输出:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的HTML页面,您可以在文本区域中看到转发的消息。您可以直接通过浏览器中的host:port访问它。
7.24. XMPP 收集器
"xmpp"接收器可以将消息发送到XMPP服务器。
7.24.1. 输入
-
byte[]
7.24.2. 输出
负载
N/A
7.24.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:
$ ./mvnw clean package
7.25. ZeroMQ 汇入
"zeromq"接收器启用向ZeroMQ套接字发送消息。
7.25.1. 输入
-
byte[]
7.25.2. 输出
负载
N/A
7.25.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:
$ ./mvnw clean package
7.25.5. 示例
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=