数据库
首页 > 数据库> > 练习 : 自定义sink mysql hbase

练习 : 自定义sink mysql hbase

作者:互联网

 

mysql

 1 package sink;
 2 
 3 //import com.util.Propss;
 4 //import com.bean.Sku;
 5 import org.apache.flink.configuration.Configuration;
 6 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 7 import org.apache.kafka.clients.producer.KafkaProducer;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 
10 import java.lang.reflect.Field;
11 import java.sql.*;
12 
13 /**
14  * @Description:
15  * @Author: liuyuan
16  * @Times : 2021/9/24 20:09
17  */
18 
19 //自定义SINK RichSinkFuncation
20 public class MySQLSink extends RichSinkFunction<String> {
21     private static Connection conn;
22     private static PreparedStatement pre;
23 
24     private static String database;
25     private static String sql;
26     private static Class T;
27 
28     public MySQLSink(String database,String sql,Class T){
29         this.database=database;
30         this.sql=sql;
31         this.T=T;
32     }
33 
34     @Override
35     public void open(Configuration parameters) throws Exception {
36         Class.forName("com.mysql.jdbc.Driver");
37         conn = DriverManager.getConnection("jdbc:mysql://hadoop106:3306/"+database, "root", "root");
38         conn.setAutoCommit(true);
39     }
40 
41     @Override
42     public void invoke(String value, Context context) throws Exception {
43         String[] split = value.split(",");
44         Field[] declaredFields = T.getDeclaredFields();
45         pre = conn.prepareStatement(sql);
46         for (int i = 0; i < declaredFields.length; i++) {
47             if(declaredFields[i].getType().toString().equals("class java.lang.String")){
48                 pre.setString((i+1),split[i]);
49             }
50             if(declaredFields[i].getType().toString().equals("class java.lang.Integer")){
51                 pre.setInt((i+1),Integer.valueOf(split[i]));
52             }
53             if(declaredFields[i].getType().toString().equals("class java.lang.Double")){
54                 pre.setDouble((i+1),Double.valueOf(split[i]));
55             }
56             if(declaredFields[i].getType().toString().equals("class java.lang.Long")){
57                 pre.setLong((i+1),Long.valueOf(split[i]));
58             }
59         }
60         pre.execute();
61     }
62 
63     @Override
64     public void close() throws Exception {
65         pre.close();
66         conn.close();
67 
68     }
69 }

hbase

  1 package sink;
  2 
  3 import org.apache.flink.configuration.Configuration;
  4 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  5 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  6 import org.apache.hadoop.hbase.*;
  7 import org.apache.hadoop.hbase.client.*;
  8 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  9 import org.apache.hadoop.hbase.util.Bytes;
 10 
 11 import java.io.IOException;
 12 import java.lang.reflect.Field;
 13 import java.util.*;
 14 
 15 public class HBaseSink extends RichSinkFunction<String> {
 16     private Connection connection;
 17     private Class T;
 18     private String tableName;
 19     private String[] fieldsName;
 20     List<Put> list=new ArrayList<Put>();
 21     
 22     public static String[] getFiledName(Class T) {
 23         Field[] fields =T.getClass().getDeclaredFields();
 24         String[] fieldName = new String[fields.length];
 25         for (int i = 0; i < fieldName.length; i++) {
 26             fieldName[i] = fields[i].getName();
 27 
 28         }
 29         return fieldName;
 30     }
 31 
 32     public HBaseSink(Class T, String tableName){
 33         this.T=T;
 34         this.tableName=tableName;
 35         this.fieldsName=getFiledName(T);
 36     }
 37 
 38     @Override
 39     public void open(Configuration parameters) throws Exception {
 40         connection= HBase_Util.getConf();
 41     }
 42 
 43     @Override
 44     public void invoke(String value, Context context) throws Exception {
 45         String[] s1 = value.split(",");
 46         Table table = connection.getTable(TableName.valueOf(tableName));
 47 //        String rowkey = UUID.randomUUID().toString().replaceAll("-", "");
 48         Put put = new Put(Bytes.toBytes(s1[0]));
 49         for (int i = 0; i < fieldsName.length; i++) {
 50             put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(fieldsName[i]),Bytes.toBytes(s1[i]));
 51             list.add(put);
 52         }
 53         table.put(list);
 54     }
 55 
 56     @Override
 57     public void close() throws Exception {
 58         connection.close();
 59     }
 60 
 61 
 62     public static  class HBase_Util {
 63         static org.apache.hadoop.conf.Configuration con = HBaseConfiguration.create();
 64         static org.apache.hadoop.conf.Configuration conf = Propss.setConf(con);
 65         static Connection connection;
 66         static HBaseAdmin admin;
 67         static Table t;
 68 
 69         static {
 70             try {
 71                 connection = ConnectionFactory.createConnection(conf);
 72                 admin = (HBaseAdmin)connection.getAdmin();
 73             } catch (IOException e) {
 74                 e.printStackTrace();
 75             }
 76         }
 77         //获取 conn
 78         public static Connection getConf(){
 79             //创建HBase的配置对象
 80             org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
 81             //设置hbase配置属性
 82             conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108");
 83             conf.set("hbase.zookeeper.property.clientPort","2181");
 84             Connection connection=null;
 85             //通过连接函数,创建连接对象
 86             try {
 87                 connection = ConnectionFactory.createConnection(conf);
 88 
 89             } catch (IOException e) {
 90                 e.printStackTrace();
 91             }
 92             return connection;
 93         }
 94 
 95         //建表
 96         public static void build_Table(String tableName,List<String> FamilyNames) throws Exception {
 97             TableDescriptorBuilder buider = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
 98             for (String columnName : FamilyNames) {
 99                 ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of(Bytes.toBytes(columnName));
100                 buider.setColumnFamily(info);
101             }
102             TableDescriptor build = buider.build();
103             admin.createTable(build);
104             System.out.println("____build_done____");
105         }
106 
107         //插入一条数据
108         public static void insert_Row(String tableName,String row,String Family,String qualifier,String value) throws Exception {
109             t = connection.getTable(TableName.valueOf(tableName));
110             Put put = new Put(Bytes.toBytes(row));
111             Put put1 = put.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
112             t.put(put1);
113             System.out.println("____insert_Row_done____");
114         }
115 
116         //插入num条数据
117         public  static  void insert_Batch(String tableName,String row,String Family,String qualifier,String value,Integer num) throws Exception {
118             t = connection.getTable(TableName.valueOf(tableName));
119             List<Put> list=new ArrayList<>();
120             for (int i = 0; i < num; i++) {
121                 String s = UUID.randomUUID().toString().replaceAll("-", "");
122                 Put puts = new Put(Bytes.toBytes(s));
123                 Put putss = puts.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value+i));
124                 list.add(putss);
125             }
126             t.put(list);
127             System.out.println("____insert_Batch_done____");
128         }
129 
130         //删除表
131         public static void drop_Table(String tableName) throws Exception {
132             if (admin.tableExists(TableName.valueOf(tableName))){
133                 admin.disableTable(TableName.valueOf(tableName));
134                 admin.deleteTable(TableName.valueOf(tableName));
135                 System.out.println("____drop_Table_done____");
136             }else {
137                 System.out.println("____no_such_Table_found____");
138             }
139 
140         }
141 
142         //删除一条数据
143         public static void  delete_Row(String tableName,String row) throws Exception {
144             t = connection.getTable(TableName.valueOf(tableName));
145             Delete delete = new Delete(Bytes.toBytes(row));
146             t.delete(delete);
147             System.out.println("____delete_Row_done____");
148         }
149 
150         //特定列过滤查询
151         public static void scan_Filter(String tableName, String Family, String qualifier, CompareOperator compare, byte[] value) throws Exception {
152             t = connection.getTable(TableName.valueOf(tableName));
153             SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Family), Bytes.toBytes(qualifier), compare, value);
154             Scan scan = new Scan();
155             Scan scan1 = scan.setFilter(filter);
156             ResultScanner scanner = t.getScanner(scan1);
157             Iterator<Result> iterator = scanner.iterator();
158             while (iterator.hasNext()){
159                 Cell[] cells = iterator.next().rawCells();
160                 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____");
161                 for (Cell cell : cells) {
162                     System.out.print(new String(CellUtil.cloneRow(cell)));
163                     System.out.print(" - ");
164                     System.out.print(new String(CellUtil.cloneFamily(cell)));
165                     System.out.print(" - ");
166                     System.out.print(new String(CellUtil.cloneQualifier(cell)));
167                     System.out.print(" - ");
168                     System.out.println(new String(CellUtil.cloneValue(cell)));
169                 }
170 
171             }
172             System.out.println("____scan_Filter_done____");
173         }
174         //查询一条数据
175         public static void scan_Row(String tableName,String row)throws Exception{
176             t = connection.getTable(TableName.valueOf(tableName));
177             Get get = new Get(Bytes.toBytes(row));
178             Result result = t.get(get);
179             Cell[] cells = result.rawCells();
180             for (Cell cell : cells) {
181                 System.out.print(new String(CellUtil.cloneRow(cell)));
182                 System.out.print(" - ");
183                 System.out.print(new String(CellUtil.cloneFamily(cell)));
184                 System.out.print(" - ");
185                 System.out.print(new String(CellUtil.cloneQualifier(cell)));
186                 System.out.print(" - ");
187                 System.out.println(new String(CellUtil.cloneValue(cell)));
188             }
189             System.out.println("____scan_Row_done____");
190         }
191         //区间查询数据
192         public static void scan_Rows(String tableName,String row1,String row2)throws Exception{
193             t = connection.getTable(TableName.valueOf(tableName));
194             Scan sc=new Scan(Bytes.toBytes(row1),Bytes.toBytes(row2));
195             ResultScanner scanner = t.getScanner(sc);
196             Iterator<Result> iterator = scanner.iterator();
197             System.out.println("____前闭后开____");
198             while (iterator.hasNext()){
199                 Result next = iterator.next();
200                 Cell[] cells = next.rawCells();
201                 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____");
202                 for (Cell cell : cells) {
203                     System.out.print(new String(CellUtil.cloneRow(cell)));
204                     System.out.print(" - ");
205                     System.out.print(new String(CellUtil.cloneFamily(cell)));
206                     System.out.print(" - ");
207                     System.out.print(new String(CellUtil.cloneQualifier(cell)));
208                     System.out.print(" - ");
209                     System.out.println(new String(CellUtil.cloneValue(cell)));
210                 }
211             }
212             System.out.println("____scan_Rows_done____");
213         }
214         //查询一条特定列族数据
215         public static void get_value_by_family(String tableName,String row,String family)throws Exception{
216             t = connection.getTable(TableName.valueOf(tableName));
217             Get get = new Get(Bytes.toBytes(row));
218             get.addFamily(Bytes.toBytes(family));
219             Result result = t.get(get);
220             Cell[] cells = result.rawCells();
221             for (Cell cell : cells) {
222                 System.out.print(new String(CellUtil.cloneRow(cell)));
223                 System.out.print(" - ");
224                 System.out.print(new String(CellUtil.cloneFamily(cell)));
225                 System.out.print(" - ");
226                 System.out.print(new String(CellUtil.cloneQualifier(cell)));
227                 System.out.print(" - ");
228                 System.out.println(new String(CellUtil.cloneValue(cell)));
229             }
230             System.out.println("____get_value_by_family_done____");
231         }
232 
233         //查询一条特定列数据
234         public static void get_value_by_qualifier(String tableName,String row,String family,String qualifier)throws Exception{
235             t = connection.getTable(TableName.valueOf(tableName));
236             Get get = new Get(Bytes.toBytes(row));
237             get.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier));
238             Result result = t.get(get);
239             Cell[] cells = result.rawCells();
240             for (Cell cell : cells) {
241                 System.out.print(new String(CellUtil.cloneRow(cell)));
242                 System.out.print(" - ");
243                 System.out.print(new String(CellUtil.cloneFamily(cell)));
244                 System.out.print(" - ");
245                 System.out.print(new String(CellUtil.cloneQualifier(cell)));
246                 System.out.print(" - ");
247                 System.out.println(new String(CellUtil.cloneValue(cell)));
248             }
249             System.out.println("____get_value_by_qualifier_done____");
250         }
251 
252         //全查某表
253         public static void  scan_All(String tableName) throws Exception {
254             t = connection.getTable(TableName.valueOf(tableName));
255             Scan sc=new Scan();
256             ResultScanner scanner = t.getScanner(sc);
257             Iterator<Result> iterator = scanner.iterator();
258             while (iterator.hasNext()){
259                 Result next = iterator.next();
260                 Cell[] cells = next.rawCells();
261                 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____");
262                 for (Cell cell : cells) {
263                     System.out.print(new String(CellUtil.cloneRow(cell)));
264                     System.out.print(" - ");
265                     System.out.print(new String(CellUtil.cloneFamily(cell)));
266                     System.out.print(" - ");
267                     System.out.print(new String(CellUtil.cloneQualifier(cell)));
268                     System.out.print(" - ");
269                     System.out.println(new String(CellUtil.cloneValue(cell)));
270                 }
271             }
272             System.out.println("____scan_All_done____");
273         }
274         //查看所有表
275         public static  void list() throws Exception {
276             TableName[] tableNames = admin.listTableNames();
277             for (TableName tableName : tableNames) {
278                 System.out.println(tableName.toString());
279             }
280 
281         }
282         //查看所有表结构
283         public static  void desc_Table(String tableName) throws Exception {
284             List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
285             Iterator<TableDescriptor> iterator = tableDescriptors.iterator();
286             while (iterator.hasNext()){
287                 TableDescriptor next = iterator.next();
288                 if (next.getTableName().toString().equals(tableName)){
289                     System.out.println(next);
290                 }
291             }
292             System.out.println("____list_done____");
293 
294         }
295 
296 
297         //关流
298         public static  void stop() throws Exception {
299             connection.close();
300             System.out.println("____connection_close_done____");
301         }
302 
303 
304         public static class Propss {
305             public static Properties producer_Props = new Properties();
306             public static Properties consumer_Props = new Properties();
307             public static HashMap<String, Object> kafka_Consumer = new HashMap<>();
308             public static HashMap<String, Object> kafka_Producer = new HashMap<>();
309 
310             public static org.apache.hadoop.conf.Configuration setConf(org.apache.hadoop.conf.Configuration conf){
311                 conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108");
312                 conf.set("hbae.zookeeper.property.client","2181");
313                 return conf;
314             }
315             static {
316 
317                 kafka_Producer.put("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
318                 //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功
319                 kafka_Producer.put("acks", "all");
320                 //重试次数
321                 kafka_Producer.put("retries", Integer.MAX_VALUE);
322                 //批处理的字节数
323                 kafka_Producer.put("batch.size", 16384);
324                 //批处理的延迟时间,当批次数据未满之时等待的时间
325                 kafka_Producer.put("linger.ms", 1);
326                 //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB
327                 kafka_Producer.put("buffer.memory", 33554432);
328                 // properties.put("value.serializer",
329                 // "org.apache.kafka.common.serialization.ByteArraySerializer");
330                 // properties.put("key.serializer",
331                 // "org.apache.kafka.common.serialization.ByteArraySerializer");
332                 kafka_Producer.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
333                 kafka_Producer.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
334 
335                 kafka_Consumer.put("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
336                 kafka_Consumer.put("group.id", "com-test");
337                 //from beginning
338                 kafka_Consumer.put("auto.offset.reset", "earliest");
339                 kafka_Consumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
340                 kafka_Consumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
341 
342                 producer_Props.setProperty("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
343                 producer_Props.setProperty("ack", "all");
344                 producer_Props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
345                 producer_Props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
346                 producer_Props.put("auto.offset.reset", "earliest");
347 
348                 consumer_Props.setProperty("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092");
349                 consumer_Props.setProperty("group.id", "com-test");
350                 consumer_Props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
351                 consumer_Props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
352                 consumer_Props.put("auto.offset.reset", "earliest");
353             }
354 
355 
356         }
357 
358     }
359 
360 }

 

标签:String,自定义,System,____,sink,print,new,hbase,out
来源: https://www.cnblogs.com/chang09/p/16119922.html