数据库
首页 > 数据库> > Confluent Platform: ksqlDB 实时流处理 (quick start)

Confluent Platform: ksqlDB 实时流处理 (quick start)

作者:互联网

文章目录

1, Confluent Platform介绍

功能

2, 快速部署: quick start

Confluent Platform quickstart: https://docs.confluent.io/platform/current/quickstart/ce-quickstart.html

类别命令
启动服务confluent local services start
停止服务confluent local services stop
删除服务和数据confluent local destroy

a, 解压安装并启动服务

#1, 解压并配置环境变量
[root@c7-docker confluent-6.1.1]# export CONFLUENT_HOME=/opt/confluent-6.1.1/
[root@c7-docker confluent-6.1.1]# echo 'export CONFLUENT_HOME=/opt/confluent-6.1.1/' >>  /etc/profile
[root@c7-docker confluent-6.1.1]# export PATH=$PATH:$CONFLUENT_HOME/bin
[root@c7-docker confluent-6.1.1]# echo 'export PATH=$PATH:$CONFLUENT_HOME/bin' >> /etc/profile

#2, 安装kafka连接器 kafka-connect-datagen
#connector doc:	      https://docs.confluent.io/home/connect/overview.html
#插件安装目录: /opt/confluent-6.1.1/share/confluent-hub-components 
[root@c7-docker confluent-6.1.1]#  grep 'plugin.path' /opt/confluent-6.1.1/etc/ -r
/opt/confluent-6.1.1/etc/kafka/connect-standalone.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/kafka/connect-standalone.properties:plugin.path=/usr/share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/kafka/connect-distributed.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/kafka/connect-distributed.properties:plugin.path=/usr/share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/ksqldb/connect.properties:# plugin.path=
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-standalone.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-standalone.properties:plugin.path=share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-distributed.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-distributed.properties:plugin.path=share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/kafka-connect-replicator/replicator-connect-distributed.properties:#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
/opt/confluent-6.1.1/etc/kafka-connect-replicator/replicator-connect-standalone.properties:#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors

#本地编译安装: git clone https://github.com/confluentinc/kafka-connect-datagen.git
# git checkout v0.4.0
# mvn clean package
# confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.4.0.zip 
### Usage: confluent-hub install : install a component from either Confluent Hub or from a local file
[root@c7-docker confluent-6.1.1]# confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://www.apache.org/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.5.0, provided by Confluent, Inc. from Confluent Hub and installing into /opt/confluent-6.1.1//share/confluent-hub-components
Adding installation directory to plugin path in the following files:
  /opt/confluent-6.1.1//etc/kafka/connect-distributed.properties
  /opt/confluent-6.1.1//etc/kafka/connect-standalone.properties
  /opt/confluent-6.1.1//etc/schema-registry/connect-avro-distributed.properties
  /opt/confluent-6.1.1//etc/schema-registry/connect-avro-standalone.properties
Completed

#3, 修改配置文件 ( 默认ksqlDB的连接地址为 localhost:8088, 防止远程连接 http://192.168.56.7:9021/ 查询的sql会报错)
[root@c7-docker confluent-6.1.1]# cd etc/
[root@c7-docker etc]# ls
cli                       confluent-control-center-fe  confluent-metadata-service  kafka                     ksqldb
confluent-common          confluent-hub-client         confluent-rebalancer        kafka-connect-replicator  rest-utils
confluent-control-center  confluent-kafka-mqtt         confluent-security          kafka-rest                schema-registry
[root@c7-docker etc]# grep ':8088' * -r
confluent-control-center/control-center.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-minimal.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center-minimal.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-dev.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center-dev.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-production.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://ksql:8088
confluent-control-center/control-center-production.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
ksqldb/ksql-server.properties:#listeners=http://0.0.0.0:8088
ksqldb/ksql-server.properties:listeners=http://192.168.56.117:8088
ksqldb/ksql-server.properties:# listeners=http://[::]:8088
ksqldb/ksql-server.properties:# listeners=https://0.0.0.0:8088
ksqldb/ksql-production-server.properties:#listeners=http://0.0.0.0:8088
ksqldb/ksql-production-server.properties:listeners=http://192.168.56.117:8088
ksqldb/ksql-production-server.properties:# listeners=http://[::]:8088
ksqldb/ksql-production-server.properties:# listeners=https://0.0.0.0:8088

#4,启动服务并查看日志
[root@c7-docker etc]# confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.007829
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
[root@c7-docker etc]# ls /tmp/confluent.007829
connect  control-center  kafka  kafka-rest  ksql-server  schema-registry  zookeeper
#数据文件,日志文件: 
[root@c7-docker lib]# ls /tmp/confluent.007829/
connect  control-center  kafka  kafka-rest  ksql-server  schema-registry  zookeeper
[root@c7-docker lib]# ls /tmp/confluent.007829/connect/
connect.properties  connect.stdout  data  logs

b, Control Center页面化操作:创建topic并生成测试数据

#a, 给 pageviews  topic生成测试数据 in AVRO format :  
       {
	  "name": "datagen-pageviews",
	  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
	  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
	  "kafka.topic": "pageviews",
	  "max.interval": "100",
	  "quickstart": "pageviews"
	}



#b, 给 users  topic生成测试数据 in AVRO format :   
	{
	  "name": "datagen-users",
	  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
	  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
	  "kafka.topic": "users",
	  "max.interval": "1000",
	  "quickstart": "users"
       }

c, 使用ksqlDB :查看数据/ 创建table或stream

ksql doc : https://docs.ksqldb.io/en/latest/concepts/streams/

######################### 使用ksqlDB创建table/stream : 
#1, 创建stream( 可以自动创建kafka topic)
# ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
#       WITH (kafka_topic='locations', value_format='json', partitions=1);

#参数说明:
#kafka_topic - 
#	Name of the Kafka topic underlying the stream. In this case it will be automatically created because it doesn't exist yet, but streams may also be created over topics that already exist.
#value_format - 
#	Encoding of the messages stored in the Kafka topic. For JSON encoding, each row will be stored as a JSON object whose keys/values are column names/values.
#partitions - 
#	Number of partitions to create for the locations topic. Note that this parameter is not needed for topics that already exist.

#2, 运行 push query over the stream: coordinates are within 5 miles 
# SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
#   再打开新的会话,插入数据并观察上面是否实时查询数据
# INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
# INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);

# ksql>  create stream users( rowkey int key, username varchar) with( KAFKA_TOPIC='users',VALUE_FORMAT='JSON');
# Message
#----------------
# Stream created
#----------------
#ksql> insert into users(username) values('a');
#ksql> insert into users(username) values('b');
#ksql> select 'hello,'+ username as greeting from users emit changes;
#+----------------------------------------------------------------------------------------------------------------------------------------------------------+
#|GREETING                                                                                                                                                  |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------+
#|hello,b                                                                                                                                                   |
#|hello,a                                                                                                                                                   |

# a stream for the pageviews topic : 
ksql> CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

# a table for the users topic:       
ksql> CREATE TABLE  users (id VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');

ksql> set 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql>  show topics;
 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 pageviews                   | 1          | 1
 users                       | 1          | 1
---------------------------------------------------------------

######################### 1,非持久化的查询:non-persistent query
ksql> SELECT * FROM pageviews EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|VIEWTIME                                          |USERID                                            |PAGEID                                            |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|129321                                            |User_1                                            |Page_40                                           |
Limit Reached
Query terminated


ksql> SELECT * from  users EMIT CHANGES LIMIT 1;
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|ID                           |REGISTERTIME                 |USERID                       |REGIONID                     |GENDER                       |
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|User_5                       |1489800608800                |User_5                       |Region_5                     |MALE                         |
Limit Reached
Query terminated

######################### 2,  持久化的查询:persistent query (as a stream) : 过滤 pageviews stream 中的女性用户,把查询结果保存到 pageviews_female topic 里面
ksql> SELECT users.id AS userid, pageid, regionid
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WHERE gender = 'FEMALE'
  EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_3                                            |Page_89                                           |Region_7                                          |
Limit Reached
Query terminated


ksql> CREATE STREAM pageviews_female AS 
  SELECT users.id AS userid, pageid, regionid
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WHERE gender = 'FEMALE'
  EMIT CHANGES;

ksql> SELECT * from  pageviews_female EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_8                                            |Page_97                                           |Region_4                                          |
Limit Reached
Query terminated

######################### 3,持久化的查询:persistent query : 过滤 regionid 以8或9结尾的,把查询结果保存到 pageviews_enriched_r8_r9 topic 里面
ksql>  SELECT * FROM pageviews_female
  WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
  EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_9                                            |Page_95                                           |Region_8                                          |
Limit Reached
Query terminated


ksql> CREATE STREAM pageviews_female_like_89
  WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', VALUE_FORMAT='AVRO')
  AS SELECT * FROM pageviews_female
  WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
  EMIT CHANGES;

ksql> SELECT * from  pageviews_female_like_89 EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_8                                            |Page_67                                           |Region_8                                          |
Limit Reached
Query terminated

######################### 4, 持久化的查询:persistent query : 统计 pageviews (stream) 里面 每个 REGIONID 和 GENDER ( 30s 为一个窗口 ) , 并且 count >1 , 把结果保存为 table ( topic 为 pageviews_regions ) 
ksql> SELECT gender, regionid, COUNT(*) AS numusers
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WINDOW TUMBLING (SIZE 30 SECOND)
  GROUP BY gender, regionid
  HAVING COUNT(*) > 1 EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|GENDER                                            |REGIONID                                          |NUMUSERS                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|FEMALE                                            |Region_6                                          |2                                                 |
Limit Reached
Query terminated


ksql> CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')
  AS SELECT gender, regionid, COUNT(*) AS numusers
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WINDOW TUMBLING (SIZE 30 SECOND)
  GROUP BY gender, regionid
  HAVING COUNT(*) > 1
  EMIT CHANGES;

ksql> SELECT * from  pageviews_regions EMIT CHANGES LIMIT 1;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|KSQL_COL_0                           |WINDOWSTART                          |WINDOWEND                            |NUMUSERS                             |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|OTHER|+|Region_3                     |1623913830000                        |1623913860000                        |3                                    |
Limit Reached
Query terminated

标签:properties,confluent,pageviews,ksqlDB,Confluent,kafka,start,connect,ksql
来源: https://blog.csdn.net/eyeofeagle/article/details/117999231