应用
5. 资料来源
5.1. 德贝齐姆源
基于Debezium引擎的变更数据采集(CDC)来源。
这德贝齐姆源允许捕获数据库变更事件并通过不同的消息绑定器进行流式传输,例如阿帕奇·卡夫卡,兔子MQ以及所有春云流支持经纪人。
| 该源可用于任何 Spring Cloud Stream 消息绑定器。 它不受 Kafka Connect 框架的限制,也不依赖于此。虽然这种方法灵活,但也存在一定的局限性。 |
支持所有 Debezium 配置属性。
只需在任何Debezium性质之前用debezium.properties。前缀。
例如,将Debezium设为connector.class财产 使用Debezium.properties.connector.class而是用源属性。
5.1.1. 数据库支持
这德贝齐姆源目前支持多种数据存储的CDC:MySQL、PostgreSQL、MongoDB、Oracle、SQL Server、Db2、Vitess 和 Spanner 数据库。
5.1.2. 选项
事件扁平化配置
Debezium 提供了全面的消息格式,准确描述系统中发生的变化信息。
不过,有时这种格式可能不适合下游消费者,因为他们可能需要格式化,使字段名称和值以简化的形式呈现,扁平结构。
为了简化Debezium连接器产生的事件记录格式,可以使用Debezium事件扁平化消息转换。 通过这个“恭维”配置,你可以配置如下简单的消息格式:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 偏移存储
当Debezium源运行时,它会读取源信息并定期记录补偿这决定了它处理了多少信息。
如果源被重启,它会使用最后记录的偏移量来判断应在源信息中恢复读取的哪个位置。
开箱即用,提供以下偏移存储配置选项:
-
内存内
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore -
本地文件系统
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)1 指向文件的路径,偏移量将被存储。要求 偏置.存储”设置为FileOffsetBackingStore.2 尝试提交抵消的间隔。默认是1分钟。 -
卡夫卡话题
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)1 卡夫卡主题的名称,用于存储偏移量。要求 offset.storage设置为KafkaOffsetBackingStore.2 创建偏移存储主题时使用的分区数量。 3 创建偏移存储主题时使用的复制因子。 4 尝试提交抵消的间隔。默认是1分钟。
可以实现org.apache.kafka.connect.storage.OffsetBackingStore接口输入,提供与自定义后端键值存储绑定的偏移存储。
连接器属性
下表列出了每个连接器所有可用的Debezium性质。
这些属性可以通过在debezium.properties。前缀。
5.1.3. 示例与测试
debezium集成测试使用数据库夹具,运行在本地机器上。利用预构建的 debezium docker 数据库镜像,借助 Testcontainers 进行应用。
要从IDE运行和调试测试,你需要从命令行部署所需的数据库镜像。 下面的说明说明了如何运行预配置的测试数据库,格式为Docker镜像。
MySQL
开始Debezium/Example-MySQL在 Docker 中:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选)用MySQL客户端连接到数据库并创建德贝齐姆具备所需凭证的用户: |
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
使用以下属性将 Debezium 源代码连接到 MySQL 数据库:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
| 1 | 配置Debezium源代码以使用MySqlConnector。 |
| 2 | 元数据用于识别和分发进入事件。 |
| 3 | 连接到运行在 的 MySQL 服务器本地主持人:3306如德贝齐姆用户。 |
| 4 | 包含变更事件值模式变更事件消息。 |
| 5 | 使变更事件平坦化成为可能。 |
| 6 | 源状态用于多次启动之间的保存。 |
你也可以运行DebeziumDatabasesIntegrationTest#mysql()使用这个 MySQL 配置。
| 禁用 mysql GenericContainer 测试初始化代码。 |
PostgreSQL
从Debezium/example-postgres:1.0Docker 镜像:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
你可以这样连接到这个服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将 Debezium 源代码连接到 PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
| 1 | 配置德贝齐姆源使用 PostgresConnector。 |
| 2 | 配置Debezium发动机以使用存储商店。 |
| 3 | 元数据用于识别和分发进入事件。 |
| 4 | 连接到运行于 的 PostgreSQL 服务器本地主机:5432如后文用户。 |
| 5 | 在消息中包含变更事件值的模式。 |
| 6 | 使Chage事件平坦化成为可能。 |
你也可以运行DebeziumDatabasesIntegrationTest#postgres()使用该 Postgres 配置。
| 禁用 postgres GenericContainer 测试初始化代码。 |
MongoDB
从debezium/example-mongodb:2.3.3.Final容器图片:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
初始化库存集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在蒙戈德终端输出,搜索类似的日志条目主持人:“3F95A8A6516E:27017”:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
加127.0.0.1 3f95a8a6516e进入你的/等等/主持人
使用以下属性将Debezium Source与MongoDB连接:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
| 1 | 配置德贝齐姆源使用MongoDB连接器。 |
| 2 | 配置Debezium发动机以使用存储. |
| 3 | 与运行于 的 MongoDB 的连接本地主持:27017如德贝齐姆用户。 |
| 4 | debezium.io/docs/connectors/mongodb/#tasks |
| 5 | 包含变更事件值模式来源记录事件。 |
| 6 | 使变形事件平坦化成为可能。 |
你也可以运行DebeziumDatabasesIntegrationTest#mongodb()使用这个MongoDB配置。
SQL Server
开始一个SQLserver来自Debezium/example-postgres:1.0Docker 镜像:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium SqlServer tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 Debezium 源代码与 SQLServer 连接:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
| 1 | 配置德贝齐姆源使用 SqlServerConnector。 |
| 2 | 配置Debezium发动机以使用存储国营商店。 |
| 3 | 元数据用于识别和分发进入事件。 |
| 4 | 连接到运行于 的 SQL Server本地主持:1433如南非用户。 |
你也可以运行DebeziumDatabasesIntegrationTest#sqlServer()使用该 SqlServer 配置。
神谕
从localhost启动Oracle可访问,并按照Debezium Vagrant设置中描述的配置、用户和授权设置
填充示例数据表 Debezium Oracle 教程:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件来源
该应用程序轮询目录,并将新文件或其内容发送到输出通道。 文件源默认以字节数组的形式提供文件内容。 不过,这可以通过 --file.supplier.mode 选项进行自定义:
-
ref 提供 java.io.file 引用
-
行 会逐行拆分文件,并为每行发送一条新消息
-
目录 默认。以字节数组的形式提供文件内容
使用--file.supplier.mode=lines你也可以提供额外的选项--file.supplier.withMarkers=true.
如果设置为 true,底层 FileSplitter 会在实际数据之前和之后发出额外的文件起始和结束标记消息。
这两个额外标记消息的有效载荷为FileSplitter.FileMarker.withMarkers 选项如果未明确设置,默认为 false。
5.3. FTP源
该源应用程序支持使用FTP协议传输文件。
文件是从远程目录当地应用部署的目录。源端发出的消息默认以字节数组形式提供。不过,这可以通过--模式选择:
-
裁判 提供
java.io.file参考 -
线 会逐行拆分文件,并为每行发送一条新消息
-
内容 默认。以字节数组形式提供文件内容
使用--模式=线你也可以提供额外的选项--withMarkers=真. 如果设置为true, 基础文件分拆器在实际数据之前和之后,会发送额外的文件开头和结束标记消息。这两个额外标记消息的有效载荷类型为FileSplitter.FileMarker. 选项withMarkers默认为false如果没有明确设置。
另见元数据存储选项,了解可能的共享持久存储配置,以防止重启时出现重复消息。
5.3.1. 输入
N/A(从FTP服务器获取文件)。
5.3.2. 输出
模式 = 目录
头:
-
内容类型:application/octet-stream -
file_originalFile: <java.io.File> -
file_name:<文件名>
有效载荷:
一个字节[]装满了文件内容。
模式 = 线
头:
-
内容类型:文本/纯文字 -
file_orginalFile:<java.io.File> -
file_name:<文件名> -
correlationId: <UUID>(每行都一样) -
序列号:<n> -
序列大小:0(行数直到文件被读取时才知道)
有效载荷:
一个字符串每行。
第一行可以选择性地在前加一条带有开始标记有效载荷。
最后一行后面可选择性地跟一条带有结束标记有效载荷。
标记的存在和格式由以下带标记和markers-json性能。
模式 = 参考
头:
没有。
有效载荷:
一个java.io.file对象。
5.3.4. 示例
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. http 源代码
一个源应用程序,监听HTTP请求并将正文作为消息载荷发出。
如果内容类型匹配文本/*或application/json,有效载荷将是字符串,
否则,有效载荷将是一个字节数组。
5.4.1. 有效载荷:
如果内容类型匹配文本/*或application/json
-
字符串
如果内容类型不匹配文本/*或application/json
-
字节数组
5.5. JDBC来源
该源从RDBMS轮询数据。
本资料完全基于DataSourceAutoConfiguration,更多信息请参见 Spring Boot JDBC 支持。
5.5.1. 有效载荷
-
Map<String,对象>什么时候jdbc.split == 真(默认) 和List<Map<String, Object>>否则
5.7. 阿帕奇卡夫卡源
该模块接收Apache Kafka的消息。
5.7.1. 期权
卡夫卡资料有以下选项:
(请参阅 Spring Boot 文档中关于 Spring for Apache Kafka 配置属性的相关内容)
5.8. 负载发生源
一个发送生成数据并分发到流的源。
5.8.1. 期权
负载发生源有以下选项:
- load-generator.generate-timestamp
-
时间戳是否生成。(布尔值,默认:
false) - load-generator.message-count
-
消息数量。(整数,默认:
1000) - load-generator.message-size
-
消息大小。(整数,默认:
1000) - 负载生成器。生产者
-
生产者的数量。(整数,默认:
1)
5.10. MongoDB 源
该来源来自MongoDB的数据轮询。
本资料完全基于MongoDataAutoConfiguration,更多信息请参阅 Spring Boot MongoDB 支持。
5.12. RabbitMQ 来源
“rabbit”源能够接收来自RabbitMQ的消息。
队列必须在流部署前就已存在;它们不会自动生成。你可以用 RabbitMQ 的网页界面轻松创建队列。
5.12.1. 输入
无
5.12.2. 输出
有效载荷
-
字节[]
5.12.3. 选项
兔子资源有以下选项:
另请参阅 Spring Boot 文档,了解代理连接和监听者属性的附加属性。
关于重试的说明
使用默认的ackMode(AUTO)和requeue(true)选项时,失败的消息传递将被重新尝试 无限期。 由于兔源处理不多,源本身故障的风险较小,除非下游粘结 剂由于某些原因没有连接。将 requeue 设置为 false 会导致消息在第一次尝试时被拒绝(并且可能会发送到死信如果代理配置为 Exchange/Queue)。enableRetry 选项允许配置重试参数,使失败的消息传递可以被重试,当重试用尽时,最终丢弃(或死字母)。在重试间隔内,传递线程会被暂停。重试选项有 enableRetry、maxAttempts、initialRetryInterval、retryMultiplier 和 maxRetryInterval。MessageConversionException 传递失败的消息永远不会被重试;假设如果消息第一次尝试无法转换,后续尝试也会失败。此类消息会被丢弃(或死字母处理)。 |
5.12.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹项目。 然后你可以把光盘放进某个文件夹里,然后构建它:
$ ./mvnw clean package
5.12.5. 示例
java -jar rabbit-source.jar --rabbit.queues=
5.13. 亚马逊S3来源
该源应用支持使用亚马逊S3协议传输文件。文件传输来自远程目录(S3桶)映射到当地应用程序部署的目录。
源端发出的消息默认以字节数组形式提供。不过,这也可以通过以下方式进行自定义--模式选择:
-
裁判 提供
java.io.file参考 -
线 会逐行拆分文件,并为每行发送一条新消息
-
内容 默认。以字节数组形式提供文件内容
使用--模式=线你也可以提供额外的选项--withMarkers=真. 如果设置为true, 基础文件分拆器在实际数据之前和之后,会发送额外的文件开头和结束标记消息。这两个额外标记消息的有效载荷类型为FileSplitter.FileMarker. 选项withMarkers默认为false如果没有明确设置。
另见元数据存储选项,了解可能的共享持久存储配置,以防止重启时出现重复消息。
5.13.1. 模式 = 线
头:
-
内容类型:文本/纯文字 -
file_orginalFile:<java.io.File> -
file_name:<文件名> -
correlationId: <UUID>(每行都一样) -
序列号:<n> -
序列大小:0(行数直到文件被读取时才知道)
有效载荷:
一个字符串每行。
第一行可以选择性地在前加一条带有开始标记有效载荷。
最后一行后面可选择性地跟一条带有结束标记有效载荷。
标记的存在和格式由以下带标记和markers-json性能。
5.13.2. mode = 参考
头:
没有。
有效载荷:
一个java.io.file对象。
5.13.4. 亚马逊AWS常见选项
Amazon S3 源代码(和所有其他 Amazon AWS 应用一样)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。请参阅他们关于必需且有用的自动配置属性的文档。
5.13.5. 示例
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP来源
该源应用程序支持使用SFTP协议传输文件。文件从传输远程目录当地应用部署的目录。源端发出的消息默认以字节数组形式提供。不过,这可以通过--模式选择:
-
裁判 提供
java.io.file参考 -
线 会逐行拆分文件,并为每行发送一条新消息
-
内容 默认。以字节数组形式提供文件内容
使用--模式=线你也可以提供额外的选项--withMarkers=真. 如果设置为true, 基础文件分拆器在实际数据之前和之后,会发送额外的文件开头和结束标记消息。这两个额外标记消息的有效载荷类型为FileSplitter.FileMarker. 选项withMarkers默认为false如果没有明确设置。
看SFTP-提供商用于高级配置选项。
另见元数据存储选项,了解可能的共享持久存储配置,以防止重启时出现重复消息。
5.14.1. 输入
N/A(从SFTP服务器获取文件)。
5.14.2. 输出
模式 = 目录
头:
-
内容类型:application/octet-stream -
file_name:<文件名> -
file_remoteFileInfo <文件元数据> -
file_remoteHostPort:<host:port> -
file_remoteDirectory:<相对路径> -
file_remoteFile:<文件名> -
sftp_selectedServer:<服务器密钥>(如果是多源)
有效载荷:
一个字节[]装满了文件内容。
模式 = 线
头:
-
内容类型:文本/纯文字 -
file_name:<文件名> -
correlationId: <UUID>(每行都一样) -
序列号:<n> -
序列大小:0(行数直到文件被读取时才知道) -
file_marker :<文件标记>(如果启用了带标记)
有效载荷:
一个字符串每行。
第一行可以选择性地在前加一条带有开始标记有效载荷。
最后一行后面可选择性地跟一条带有结束标记有效载荷。
标记的存在和格式由以下带标记和markers-json性能。
模式 = 参考
头:
-
file_remoteHostPort:<host:port> -
file_remoteDirectory:<相对路径> -
file_remoteFile:<文件名> -
file_originalFile:<本地文件的绝对路径> -
file_name <本地文件名> -
file_relativePath -
file_remoteFile:<远程文件名> -
sftp_selectedServer:<服务器密钥>(如果是多源)
有效载荷:
一个java.io.file对象。
5.14.4. 示例
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.16. TCP
这TCP源代码作为服务器,允许远程方连接并通过原始TCP套接字提交数据。
TCP是一种流式传输协议,需要某种机制来在线路上对消息进行帧。有若干解码器 可用,默认为“CRLF”,兼容Telnet。
TCP源应用程序产生的消息具有字节[]有效载荷。
5.16.1. 期权
5.16.2. 可用解码器
- CRLF(默认)
-
文本以回车(0x0d)结束,随后换行(0x0a)
- 如果
-
文本以换行符结束(0x0a)
- 零
-
以空字节(0x00)结束的文本
- STXETX
-
文本前置STX(0x02),结尾为ETX(0x03)
- 生
-
无结构——客户端通过关闭套接字表示完整消息
- 第一语言
-
数据前有一个一字节(无符号)长度字段(支持最多255字节)
- L2
-
数据前有一个两字节(无符号)长度字段(最多216-1字节)
- L4
-
数据前有一个四字节(带符号)长度字段(最多231-1字节)
5.17. 时间源
时间源会定期发出一个包含当前时间的字符串。
5.17.1. 期权
时间源有以下选项:
spring.integration.poller
- 克朗
-
Cron表达式用于投票。与“fixedDelay”和“fixedRate”互斥。(字符串,默认:
<没有>) - 固定延迟
-
投票延迟期。与“cron”和“fixedRate”互斥。(持续时间,默认:
<没有>) - 固定费率
-
投票率阶段。与“fixedDelay”和“cron”互斥。(持续时间,默认:
<没有>) - 初始延迟
-
初步延迟投票。申请了“固定延迟”和“固定费率”;被忽略,只用“克隆”。(持续时间,默认:
<没有>) - 每轮询最大消息数
-
每个轮询周期可轮询的最大消息数。(整数,默认:
<没有>) - 接收超时
-
投票通知要等多久?(持续时间,默认:
1)
5.18. 推特消息来源
反复检索过去30天内的直接消息(包括发送和接收),按倒时间顺序排序。
解除的消息被缓存(在元数据存储缓存)以防止重复。
默认情况下,内存中SimpleMetadataStore被使用。
这推特.消息.来源.计数控制返回消息的数量。
这spring.cloud.stream.poller属性控制消息轮询间隔。
必须与已使用API的速率限制保持一致
5.18.1. 期权
5.19. 推特搜索源
Twitter的标准搜索API(搜索/推文)允许对最近或热门推文的索引进行简单查询。这源对过去7天内发布的推文进行持续搜索。属于“公开”API集。
返回一组匹配特定查询的相关推文。
使用该spring.cloud.stream.poller属性用于控制连续搜索请求之间的间隔。速率限制 - 每30分钟窗口180个请求(例如~6转/分钟,~1个请求/10秒)
这推特.搜索查询属性允许按关键词查询,并按时间和地理位置筛选结果。
这推特.search.count和twitter.search.page根据搜索API控制结果分页。
注意:Twitter的搜索服务及其延伸的搜索API并非推文的详尽来源。并非所有推文都会被索引或通过搜索界面提供。
5.19.1. 期权
5.20. 推特流源
-
这
过滤 API返回与一个或多个过滤谓词匹配的公共状态。 多个参数支持使用单一连接流媒体API。 提示:该跟踪,跟随和地点场与运算符结合! 查询track=foo和跟随=1234回复 推文匹配测试或者由用户创建1234. -
这
示例API返回所有公共状态的一小部分随机样本。 默认访问级别返回的推文是相同的,所以如果两个不同的客户端连接到该端点,它们会看到相同的推文。
默认访问层允许最多400条关键词、5000个关注用户ID和25个0.1-360度位置框。
5.20.1. 期权
5.21. Websocket 源代码
这Websocket通过Web Socket生成消息的源代码。
5.21.1. 选项
5.21.2. 示例
为了验证websocket-source是否接收Websocket客户端的消息,可以使用以下简单的端到端设置。
第一步:开始写卡夫卡
第二步:部署websocket-source在某个具体端口,比如8080端口
步骤3:连接一个位于8080端口路径“/websocket”的Websocket客户端,发送一些消息。
你可以启动一个Kafka主机用户,看到那里的消息。
5.22. XMPP 来源
“xmpp”源使得从XMPP服务器接收消息成为可能。
5.22.1. 输入
无
5.22.2. 输出
有效载荷
-
字节[]
5.22.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹项目。 然后你可以把光盘放进某个文件夹里,然后构建它:
$ ./mvnw clean package
5.22.5. 示例
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ 源
“zeromq”源能够接收来自ZeroMQ的消息。
5.23.1. 输入
无
5.23.2. 输出
有效载荷
-
字节[]
5.23.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹项目。 然后你可以把光盘放进某个文件夹里,然后构建它:
$ ./mvnw clean package
5.23.5. 示例
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. 处理器
6.1. 聚合处理器
聚合处理器使应用程序能够将收到的消息聚集成组并释放到输出目的地。
Java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果你想在RabbitMQ上运行Kafka,可以把它改成Rabbit。
6.1.1. 有效载荷
如果输入有效载荷是字节[]content-type 头是 JSON,则JsonBytesToMap函数尝试将该有效载荷反序列化为地图以便更好地表示聚合器函数输出的数据。
还有,这样的地图数据表示使得从下面提到的 SpEL 表达式中访问有效载荷内容变得容易。
否则(包括反序列化错误),输入有效载荷保持原样——目标应用配置将其转换为所需形式。
6.1.2. 选项
6.3. 滤波处理器
过滤处理器使应用程序能够检查进入的有效载荷,然后对其应用一个谓词,决定记录是否需要继续。
例如,如果输入的有效载荷类型为字符串如果你想过滤掉少于五个字符的内容,可以按照下面的方式运行过滤处理器。
Java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果你想在RabbitMQ上运行Kafka,可以把它改成Rabbit。
6.3.1. 有效载荷
你可以将任意类型传递为有效载荷,然后对其应用 SpEL 表达式进行过滤。
如果输入类型是字节[]内容类型设置为文本/纯文字或application/json然后应用程序将字节[]到字符串.
6.3.2. 选项
6.4. Groovy Processor
一个对消息应用Groovy脚本的处理器。
6.4.1. 选项
groovy-processor处理器具备以下选项:
- groovy-processor.script
-
引用用于处理消息的脚本。(资源,默认:
<没有>) - groovy-processor.variables
-
变量绑定作为新行分隔串的名称-值对,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - groovy-processor.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认:
<没有>)
6.5. 头部增益处理器
使用头部增强器应用添加消息头部。
头部以新行分隔键值对的形式提供,其中键为头部名称,值为SpEL表达式。
例如--headers='foo=payload.someProperty \n bar=payload.otherProperty'.
6.6. Http请求处理器
一个处理器应用,向HTTP资源发出请求,并将响应体作为消息载荷发出。
6.6.1. 输入
头
任何必修的HTTP头必须通过头或头部表达式财产。 请参见下方示例。头部值也可用于构造:
-
请求主体在
身体表情财产。 -
HTTP方法在
http-方法表达式财产。 -
在
URL 表达式财产。
有效载荷
有效载荷默认作为POST请求的请求体,可以是任意Java类型。对于GET请求,它应是一个空的字符串。有效载荷还可以用于构造:
-
请求主体在
身体表情财产。 -
HTTP方法在
http-方法表达式财产。 -
在
URL 表达式财产。
底层Web客户端支持Jackson JSON序列化,必要时支持任何请求和响应类型。 这期望反应型属性String.class默认情况下,可以设置为应用类路径中的任意类。注意,用户自定义的有效载荷类型需要向你的 POM 文件添加必要的依赖。
6.6.2. 输出
头
没有任何 HTTP 消息头映射到出站消息。
有效载荷
原始输出对象是 ResponseEntity<?>其任意字段(例如,身体,头)或访问器方法(状态代码)可以作为回复表达式. 默认情况下,出站消息有效载荷是响应主体。注意 ResponseEntity(由表达式#root)默认情况下不能被Jackson反序列化,但可以被表示为哈希图.
6.6.3. 期权
http-request 处理器具备以下选项:
processors.adoc 中的未解决指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.. /.. /applications/processor/image-recognition-processor/README.adoc[标签=ref-doc]
processors.adoc 中的未解决指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.. /.. /applications/processor/object-detection-processor/README.adoc[tags=ref-doc]
processors.adoc 中的未解决指令 - include:/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.. /.. /applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc]
6.7. 脚本处理器
处理器,利用脚本转换消息。脚本主体直接提供作为属性值。脚本的语言可以被指定(groovy/javascript/ruby/python)。
6.7.1. 期权
脚本处理器处理器具备以下选项:
- 脚本处理者.language
-
脚本属性中的文本语言。支持:groovy、javascript、ruby、python。(字符串,默认:
<没有>) - 脚本处理器.脚本
-
剧本文本。(字符串,默认:
<没有>) - 脚本处理器变量
-
变量绑定作为新行分隔串的名称-值对,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - script-processor.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认:
<没有>)
6.8. 分流处理器
分流器应用基于 Spring Integration 中同名的概念,允许将单条消息拆分为多个不同的消息。处理器使用一个函数,将留言<?>作为输入,然后产生列表<消息<?>作为基于各种属性的输出(见下文)。你可以使用SpEL表达式或分隔符来指定如何分割收到的消息。
6.8.1. 有效载荷
-
来袭有效载荷——
留言<?>
如果输入类型是字节[]内容类型设置为文本/纯文字或application/json然后应用程序将字节[]到字符串.
-
即将发射的有效载荷——
列表<消息<?>
6.8.2. 期权
6.9. 变换处理器
Transformer 处理器允许你基于 SpEL 表达式转换消息有效载荷结构。
这里有一个运行该应用的示例。
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
如果你想在RabbitMQ上运行Kafka,可以把它改成Rabbit。
6.9.1. 有效载荷
收到的消息可以包含任何类型的有效载荷。
6.9.2. 期权
6.10. Twitter 趋势和趋势位置处理器
能够返回热门话题或热门话题的位置的处理器。 这推特。趋势。趋势-查询类型属性允许选择查询类型。
6.10.2. 检索趋势位置
对于该模式集合推特。趋势。趋势-查询类型自趋势位置.
按地点获取完整或附近的热门话题列表。
如果纬度,经度只要处理器执行趋势可用API,并返回Twitter拥有热门话题信息的位置,则不包含参数。
如果纬度,经度参数前提是处理器执行趋势最近 API,并返回 Twitter 拥有趋势话题信息且最接近指定位置的位置。
响应是一个数组地点该信息编码了该地点的 WOEID 以及一些人类可读的信息,比如该位置所属的官方名称和国家。
6.10.3. 选项
7. 沉没
7.1. 卡桑德拉·辛克
该汇入应用会将收到的每条消息的内容写入Cassandra。
它期望收到 JSON String 的负载,并用其属性映射到表列。
7.1.1. 有效载荷
一个 JSON 字符串或字节数组,表示要持久化的实体(或实体列表)。
7.2. 分析汇
Sink 应用构建在 Analytics Consumer 之上,能够从输入消息中计算分析数据,并将分析数据作为指标发布给各种监控系统。它利用微米库在最流行的监控系统中提供统一的编程体验,并揭示了Spring表达式语言(SpEL)特性,用于定义如何从输入数据计算度量名称、值和标签。
分析汇可以生成两种指标类型:
米(如计数器或规范)由其唯一标识名称和尺寸(尺寸和标签这两个术语可以互换使用。)维度允许对特定命名的指标进行切片,以深入分析和推理数据。
作为度量唯一由其标识的名称和尺寸你可以为每个指标分配多个标签(e.g. key/值对),但不能随意更改这些标签!像Prometheus这样的监控系统如果同名指标使用不同的标签组,就会抱怨。 |
使用该analytics.name或analytics.name-expression属性设置了输出分析指标的名称。如果没有设置,指标名称默认为应用程序名称。
使用该analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>,属性,用于在你的指标中添加一个或多个标签。这TAG_NAME属性定义中使用的指标将作为标签名称出现。TAG_VALUE是SpEL表达式,能够动态计算来自接收消息的标签值。
这SpEL表达式使用头和有效载荷关键词用于访问消息的头部和有效载荷值。
你可以使用文字(例如:“固定价值”)以设置固定值的标签。 |
所有Stream Applications一出厂就支持三种最受欢迎的监控系统,波,普罗 米修斯和InfluxDB你可以声明式地启用它们。
你可以通过在分析汇应用。
请访问春季云数据流监控,获取详细的监控系统配置说明。以下简要内容可以帮助你开始。
-
要启用普罗米修斯电表注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用波前表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 电表注册表,请设置以下属性。
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,那么分析汇将重复使用提供的指标配置。 |
下图展示了如何分析汇帮助收集企业内部信息,用于股票交易所和实时管道。
7.2.1. 有效载荷
收到的消息可以包含任何类型的有效载荷。
7.2.2. 选项
7.3. 弹性搜索汇
一个能将文档索引到Elasticsearch的Sink。
这个 Elasticsearch sink 只支持索引 JSON 文档。
它从输入目的地获取数据,然后将其索引到 Elasticsearch。
输入数据可以是普通的 JSON 字符串,也可以是java.util.Map代表 JSON。
它也接受Elasticsearch提供的数据XContentBuilder.
不过这种情况很少见,因为中间件不太可能像XContentBuilder.
这主要用于直接调用消费者。
7.3.2. 运行该水槽的示例
-
摘自文件夹
弹性搜索汇:./mvnw 清洁封装 -
CD应用
-
cd 到正确的绑定器生成应用(Kafka 或 RabbitMQ)
-
./mvnw 清洁封装 -
确保你运行着Elasticsearch。例如,你可以用以下命令将其作为 docker 容器运行。
Docker run -d --name es762 -p 9200:9200 -e “discovery.type=single-node” elasticsearch:7.6.2 -
如果中间件(Kafka 或 RabbitMQ)还没运行,就启动它。
-
Java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing -
把一些JSON数据发送到中间件目标。例如:
{“foo”:“bar”} -
确认数据已被索引:
curl localhost:9200/testing/_search
7.5. FTP 汇
FTP sink 是一个简单的选项,可以将收到的消息推送文件到 FTP 服务器。
它使用了一个FTP-出站适配器因此,收到的消息可以是java.io.file对象,a字符串(文件内容)
或字节(文件内容也包括)
使用这个水槽需要用户名和密码才能登录。
默认情况下,Spring 集成会使用o.s.i.file.DefaultFileNameGenerator如果没有指定。DefaultFileNameGenerator将决定文件名
基于file_name在消息头,或者如果 的有效载荷消息已经是java.io.file那么它就会
使用该文件的原始名称。 |
7.5.1. 头部
-
file_name(见上文注释)
7.5.2. 有效载荷
-
java.io.file -
java.io.InputStream -
字节[] -
字符串
7.5.3. 输出
不行(写入FTP服务器)。
7.6. JDBC汇入
JDBC sink 允许你将进入的有效载荷持久化到关系数据库数据库中。
这jdbc.consumer.columns性质表示COLUMN_NAME[:EXPRESSION_FOR_VALUE]哪里EXPRESSION_FOR_VALUE(连同冒号)是可选的。
在这种情况下,该值通过生成的表达式进行计算,如有效载荷。COLUMN_NAME这样我们就能直接映射对象属性到表列。
例如,我们有一个像这样的JSON有效载荷:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
所以,我们可以将其代入到表中,且名称,城市和街结构基于以下构型:
--jdbc.consumer.columns=name,city:address.city,street:address.street
该汇入支持批处理插入,前提是底层JDBC驱动支持。
批量插入通过批次大小和闲置超时性能:
收到的消息会被聚合到批次大小消息存在后,作为批次插入。
如果闲置超时毫秒过去后没有新消息,即使聚合批次小于批次大小,限制最大延迟。
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此有以下属性spring.datasource.url 等等,申请。 |
7.6.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. 有效载荷
7.8. 木头水槽
这日志Sink 使用应用日志器输出数据进行检查。
请理解这一点日志Sink 使用无类型处理程序,这会影响实际日志的执行方式。
这意味着如果内容类型是文本,那么原始有效载荷字节会被转换为字符串,否则原始字节会被记录。
更多信息请参见用户指南。
7.9. MongoDB 汇入
该汇入应用将输入数据导入MongoDB。
该应用完全基于MongoDataAutoConfiguration,更多信息请参阅 Spring Boot MongoDB 支持。
7.9.1. 输入
有效载荷
-
任何POJO
-
字符串 -
字节[]
7.11. Pgcopy Shang
一个模块,通过 PostgreSQL COPY 命令将其收到的有效载荷写入 RDBMS。
7.11.1. 输入
头
有效载荷
-
任何
列表达式会根据消息进行评估,表达式通常只兼容一种类型(如 Map 或 bean 等)。
7.11.2. 输出
无
7.11.3. 期权
jdbc 汇具备以下选项:
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此有以下属性spring.datasource.url 等等,申请。 |
7.11.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹项目。 然后你可以把光盘放进其中一个文件夹里,然后构建它:
$ ./mvnw clean package
为了运行集成测试,可以在localhost上启动PostgreSQL数据库:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. 示例
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.14. 路由水槽
该应用程序将消息路由到指定信道。
7.14.1. 选项
路由器水槽有以下选项:
- router.default-output-binding
-
发送无法路由的消息。(字符串,默认:
<没有>) - router.destination-mappings
-
目的地映射作为新行分隔串的名称-值对,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - router.expression
-
将应用到消息上的表达式,以确定要路由到哪个信道。注意,文本、json 或 xml 等内容类型的有效载荷线格式是字节[],而非字符串!请参阅如何处理字节数组有效载荷内容的文档。(表达式,默认:
<没有>) - router.refresh-delay
-
如果有,MS中多久检查一次脚本变动;<0表示不要刷新。(整数,默认:
60000) - router.resolution-required(需要)
-
是否需要通道分辨率。(布尔值,默认:
false) - router.script
-
返回通道或通道映射分辨率键的groovy脚本位置。(资源,默认:
<没有>) - router.variables
-
变量绑定作为新行分隔串的名称-值对,例如 'foo=bar\n baz=car'。(属性,默认:
<没有>) - router.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认:
<没有>)
这个路由器水槽基于流桥Spring Cloud Stream 的 API,因此可以根据需要创建目的地。
在这种情况下,adefaultOutputBinding只有当密钥不包含于 时才能访问目的地地图.
这必要分辨率 = 真忽略defaultOutputBinding如果没有映射和相应的绑定,则抛出异常。 |
你可以用spring.cloud.stream.dynamic目的地财产。
默认情况下,所有已解析的目的地都会被动态绑定;如果该属性有逗号分隔的目的地名称列表,则只有这些目标会被绑定。
如果消息最终解析到不在该列表中的目的地,则会被路由到defaultOutputBinding,也必须出现在列表中。
这目的地地图用于将评估结果映射到实际的目的地名称。
7.14.2. 基于特殊语言的路由
该表达式会根据消息进行评估,返回信道名称或信道名称映射的密钥。
欲了解更多信息,请参阅 Spring 集成参考手册“配置通用路由器”章节中的“路由器与 Spring 表达式语言(SpEL)”子章节。
7.14.3. 基于Groovy的路由
除了 SpEL 表达式,也可以使用 Groovy 脚本。我们在文件系统里创建一个Groovy脚本,地址为“file:/my/path/router.groovy”,或者“classpath:/my/path/router.groovy”:
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果你想把变量值传递给脚本,可以用变量选项静态绑定值,或者选择用propertiesLocation选项将路径传递到包含绑定的properties文件。 文件中的所有属性都将作为变量向脚本开放。你可以同时指定变量和 propertiesLocation,在这种情况下,任何作为变量提供的重复值都会覆盖 propertiesLocation 中的值。 注意,payload 和 header 是隐式绑定的,可以让你访问消息中包含的数据。
更多信息请参见 Spring Integration 参考手册《Groovy Support》。
7.16. 亚马逊S3沉没
这个汇入应用支持将对象转移到亚马逊S3桶。
文件、有效载荷(以及递归的目录)会被传输到远程目录(S3桶)映射到当地应用程序部署的目录。
该汇入接收的消息必须包含以下有效载荷如:
-
文件,包括递归上传目录; -
输入流; -
字节[]
7.16.1. 期权
S3水槽有以下选项:
目标生成的应用程序基于AmazonS3SinkConfiguration可以通过以下方式增强S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener(进步听众),这些 被注入到S3MessageHandler豆。
详情请参见 Spring 集成 AWS 支持。
7.16.2. 亚马逊AWS常见选项
Amazon S3 Sink(以及所有其他 Amazon AWS 应用)基于 Spring Cloud AWS 项目作为基础,并实现了自动配置 Spring Boot 会自动使用职业。 请参阅他们关于必需且有用的自动配置属性的文档。
其中一些是关于AWS凭证的:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profilePath
其他的则是AWS的地区定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP沉没
SFTP sink 是一个简单的选项,可以将收到的消息推送文件到 SFTP 服务器。
它使用了一个SFTP-出站适配器因此,收到的消息可以是java.io.file对象,a字符串(文件内容)
或字节(文件内容也包括)
使用这个水槽需要用户名和密码才能登录。
默认情况下,Spring 集成会使用o.s.i.file.DefaultFileNameGenerator如果没有指定。DefaultFileNameGenerator将决定文件名
基于file_name在消息头,或者如果 的有效载荷消息已经是java.io.file那么它就会
使用该文件的原始名称。 |
在配置sftp.factory.known-hosts-expressionoption,评估的根对象是应用上下文,例如:sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.
7.17.1. 输入
头
-
file_name(见上文注释)
有效载荷
-
java.io.file -
java.io.InputStream -
字节[] -
字符串
7.17.2. 输出
不行(写入SFTP服务器)。
7.18. TCP 吸收
该模块通过编码器向TCP写入消息。
TCP是一种流式传输协议,需要某种机制来在线路上对消息进行帧。有若干编码器 可用,默认是“CRLF”。
7.18.2. 可用编码器
- CRLF(默认)
-
文本以回车(0x0d)结束,随后换行(0x0a)
- 如果
-
文本以换行符结束(0x0a)
- 零
-
以空字节(0x00)结束的文本
- STXETX
-
文本前置STX(0x02),结尾为ETX(0x03)
- 生
-
无结构——客户端通过关闭套接字表示完整消息
- 第一语言
-
数据前有一个一字节(无符号)长度字段(支持最多255字节)
- L2
-
数据前有一个两字节(无符号)长度字段(最多216-1字节)
- L4
-
数据前有一个四字节(带符号)长度字段(最多231-1字节)
7.20。推特消息汇
从认证用户向指定用户发送直接消息。
需要一个 JSON POST 正文,并且内容类型将 首部设置为application/json.
| 当用户收到消息时,在24小时内最多可发送5条回复。 每收到一条消息,都会重置24小时窗口和分配的5条消息。 在24小时窗口内发送第6条消息或在24小时窗口外发送消息将计入速率限制。 这种行为只在使用 POST direct_messages/事件/新端点时出现。 |
SpEL表达式用于计算输入消息的请求参数。
7.21. 推特更新汇报
更新认证用户当前的文本(例如推文)。
| 每次更新尝试时,更新文本都会与认证用户最近的推文进行比较。 任何可能导致重复的尝试都会被阻止,导致403错误。 用户不能连续提交相同的文本两次。 |
虽然API没有速率限制,但用户一次能创建的推文数量是有限的。 标准API的更新限制是3小时内300次。 如果用户发布的更新次数达到当前允许的限制,该方法将返回HTTP 403错误。
你可以在这里找到更新API的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 期权
7.22. 波前汇陷
Wavefront 汇接收消息 <?>,将其转换为 Wavefront 数据格式的指标,并直接发送给 Wavefront 或 Wavefront 代理。
支持常见的ETL用例,即需要清理、转换现有(历史)指标数据并存储在Wavefront中以便进一步分析。
7.23. Websocket Sink
一个简单的Websocket Sink实现。
7.23.2. 示例
为了验证websocket-sink是否接收来自其他Spring-Cloud-Stream应用的消息,你可以使用 遵循简单的端到端设置。
步骤1:开始Rabbitmq
步骤2:部署时间源
步骤3:部署Websocket-sink
最后启动一个websocket-sink跟踪模式,这样你就能看到由时间源日志中:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
你应该会在你启动WebsocketSink的控制台里看到日志消息,内容如下:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
有一个端点你可以用它来访问最后一个n发送和接收的信息。你必须
通过提供--endpoints.websocketconsumertrace.enabled=true.默认情况下,它通过host:port/websocketConsumertrace.以下是示例输出:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的HTML页面,文本区域里可以看到转发的消息。你可以访问
它直接通过host:port在你的浏览器里。
7.24. XMPP 沉没
“xmpp”汇入器允许向XMPP服务器发送消息。
7.24.1. 输入
-
字节[]
7.24.2. 输出
有效载荷
无
7.24.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹项目。 然后你可以把光盘放进某个文件夹里,然后构建它:
$ ./mvnw clean package
7.25. ZeroMQ 汇入
“zeromq”接收器使消息能够发送到ZeroMQ套接字。
7.25.1. 输入
-
字节[]
7.25.2. 输出
有效载荷
无
7.25.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
你可以在这里找到相应的活页夹项目。 然后你可以把光盘放进某个文件夹里,然后构建它:
$ ./mvnw clean package
7.25.5. 示例
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=