其他分享
首页 > 其他分享> > 整理CDCkafka各类消息格式

整理CDCkafka各类消息格式

作者:互联网

一、mysql-ogg 资料

  1. delete

{
  "table":"TABLE",
  "op_type":"D",
  "op_ts":"TIME",
  "current_ts":"TIME",
  "pos":"00000000010000045759",
  "before":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn"
  }
}
  1. insert

{
  "table":"TABLE",
  "op_type":"I",
  "op_ts":"TIME",
  "current_ts":"TIME",
  "pos":"00000000010000071669",
  "after":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn"
  }
}
  1. update

{
  "table":"TABLE",
  "op_type":"U",
  "op_ts":"TIME",
  "current_ts":"TIME",
  "pos":"00000000010000071669",
  "primary_keys":[
        "c1"
   ],
  "before":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn"
  },
  "after":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn111111111111"
  }
}

二、mysql-canal 资料

INSERT INTO order(order_id, amount) VALUES ('10086', 999);

{
  "data": [{
    "id": "1",
    "order_id": "10086",
    "amount": "999.0",
    "create_time": "2020-03-02 05:12:49"
  }],
  "database": "test",
  "es": 1583143969000,
  "id": 3,
  "isDdl": false,
  "mysqlType": {
    "id": "BIGINT",
    "order_id": "VARCHAR(64)",
    "amount": "DECIMAL(10,2)",
    "create_time": "DATETIME"
  },
  "old": null,
  "pkNames": ["id"],
  "sql": "",
  "sqlType": {
    "id": -5,
    "order_id": 12,
    "amount": 3,
    "create_time": 93
  },
  "table": "order",
  "ts": 1583143969460,
  "type": "INSERT"
}

UPDATE order SET amount = 10087 WHERE order_id = '10086';

{
  "data": [{
    "id": "1",
    "order_id": "10086",
    "amount": "10087.0",
    "create_time": "2020-03-02 05:12:49"
  }],
  "database": "test",
  "es": 1583143974000,
  "id": 4,
  "isDdl": false,
  "mysqlType": {
    "id": "BIGINT",
    "order_id": "VARCHAR(64)",
    "amount": "DECIMAL(10,2)",
    "create_time": "DATETIME"
  },
  "old": [{
    "amount": "999.0"
  }],
  "pkNames": ["id"],
  "sql": "",
  "sqlType": {
    "id": -5,
    "order_id": 12,
    "amount": 3,
    "create_time": 93
  },
  "table": "order",
  "ts": 1583143974870,
  "type": "UPDATE"
}

DELETE FROM order WHERE order_id = '10086';

{
  "data": [{
    "id": "1",
    "order_id": "10086",
    "amount": "10087.0",
    "create_time": "2020-03-02 05:12:49"
  }],
  "database": "test",
  "es": 1583143980000,
  "id": 5,
  "isDdl": false,
  "mysqlType": {
    "id": "BIGINT",
    "order_id": "VARCHAR(64)",
    "amount": "DECIMAL(10,2)",
    "create_time": "DATETIME"
  },
  "old": null,
  "pkNames": ["id"],
  "sql": "",
  "sqlType": {
    "id": -5,
    "order_id": 12,
    "amount": 3,
    "create_time": 93
  },
  "table": "order",
  "ts": 1583143981091,
  "type": "DELETE"
}

三、Oracle-ogg 资料

  1. delete

{
  "table":"TABLE",
  "op_type":"D",
  "op_ts":"TIME",
  "current_ts":"TIME",
  "pos":"00000000010000045759",
  "before": {
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn"
  }
}
  1. insert

{
  "table":"TABLE",
  "op_type":"I",
  "op_ts":"TIME",
  "current_ts":"TIME",
  "pos":"00000000010000071669",
  "after":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn"
  }
}
  1. update

{
  "table":"TABLE",
  "op_type":"U",
  "op_ts":"TIME",
  "current_ts":"TIME",
  "pos":"00000000010000071669",
  "before":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn"
  },
  "after":{
    "c1": 1,
    "c2": "C2",
    "c3": 0.56,
    "c4": 0.9526831,
    "c5": "2021-02-03",
    "c6": 100,
    "c7": 100.125,
    "c8": "textcontent",
    "c9": 0.12345,
    "c10": "2020-02-01",
    "c11": 11,
    "c12": 12.12,
    "c13": "2021-02-13",
    "c14": 14.14,
    "c15": "lastcolumn111111111111"
  }
}

四、Oracle-Kafka connect资料1 资料2

{
  "schema": {
    "type": "struct",
    "fields": [{
      "type": "int64",
      "optional": false,
      "field": "SCN"
    }, {
      "type": "string",
      "optional": false,
      "field": "SEG_OWNER"
    }, {
      "type": "string",
      "optional": false,
      "field": "TABLE_NAME"
    }, {
      "type": "int64",
      "optional": false,
      "name": "org.apache.kafka.connect.data.Timestamp",
      "version": 1,
      "field": "TIMESTAMP"
    }, {
      "type": "string",
      "optional": false,
      "field": "SQL_REDO"
    }, {
      "type": "string",
      "optional": false,
      "field": "OPERATION"
    }, {
      "type": "struct",
      "fields": [{
        "type": "double",
        "optional": false,
        "field": "ID"
      }, {
        "type": "string",
        "optional": true,
        "field": "NAME"
      }, {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "CREATETIME"
      }],
      "optional": true,
      "name": "value",
      "field": "data"
    }, {
      "type": "struct",
      "fields": [{
        "type": "double",
        "optional": false,
        "field": "ID"
      }, {
        "type": "string",
        "optional": true,
        "field": "NAME"
      }, {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "CREATETIME"
      }],
      "optional": true,
      "name": "value",
      "field": "before"
    }],
    "optional": false,
    "name": "zztar.whtest.t1.row"
  },
  "payload": {
    "SCN": 2847668,
    "SEG_OWNER": "WHTEST",
    "TABLE_NAME": "T1",
    "TIMESTAMP": 1600829206000,
    "SQL_REDO": "insert into \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')",
    "OPERATION": "INSERT",
    "data": {
      "ID": 5.57005146E8,
      "NAME": "533888119",
      "CREATETIME": 1600829206000
    },
    "before": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [{
      "type": "int64",
      "optional": false,
      "field": "SCN"
    }, {
      "type": "string",
      "optional": false,
      "field": "SEG_OWNER"
    }, {
      "type": "string",
      "optional": false,
      "field": "TABLE_NAME"
    }, {
      "type": "int64",
      "optional": false,
      "name": "org.apache.kafka.connect.data.Timestamp",
      "version": 1,
      "field": "TIMESTAMP"
    }, {
      "type": "string",
      "optional": false,
      "field": "SQL_REDO"
    }, {
      "type": "string",
      "optional": false,
      "field": "OPERATION"
    }, {
      "type": "struct",
      "fields": [{
        "type": "double",
        "optional": false,
        "field": "ID"
      }, {
        "type": "string",
        "optional": true,
        "field": "NAME"
      }, {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "CREATETIME"
      }],
      "optional": true,
      "name": "value",
      "field": "data"
    }, {
      "type": "struct",
      "fields": [{
        "type": "double",
        "optional": false,
        "field": "ID"
      }, {
        "type": "string",
        "optional": true,
        "field": "NAME"
      }, {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "CREATETIME"
      }],
      "optional": true,
      "name": "value",
      "field": "before"
    }],
    "optional": false,
    "name": "zztar.whtest.t1.row"
  },
  "payload": {
    "SCN": 2847668,
    "SEG_OWNER": "WHTEST",
    "TABLE_NAME": "T1",
    "TIMESTAMP": 1600829206000,
    "SQL_REDO": "update \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (123,'123',TIMESTAMP '2020-09-23 10:46:46')",
    "OPERATION": "UPADTE",
    "data": {
      "ID": 123,
      "NAME": "123",
      "CREATETIME": 1600829206000
    },
    "before": {
      "ID": 5.57005146E8,
      "NAME": "533888119",
      "CREATETIME": 1600829206000
    }
  }
}

五、postgresql 资料

test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1'); INSERT 0 1

{
  "change": [
    {
      "kind": "insert",
      "schema": "mdmv2",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1", "code1"]
    }
  ]
}

test=# update test_table set code='code2' where id='id1'; UPDATE 1

{
  "change": [
    {
      "kind": "update",
      "schema": "mdmv2",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1", "code2"],
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1"]
      }
    }
  ]
}

test=# delete from test_table where id='id1'; DELETE 1

{
  "change": [
    {
      "kind": "delete",
      "schema": "mdmv2",
      "table": "test_table",
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1"]
      }
    }
  ]
}

ALTER TABLE test_table REPLICA IDENTITY USING INDEX test_table_pkey;

{
  "change": [
    {
      "kind": "update",
      "schema": "mdmv2",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1", "code2"],
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1       "]
      }
    }
  ]
}

六、sqlserver 资料

insert into [test_table] (UserName,IsOnline,LastLogin) values('测试', 1, DATEDIFF(s, '19700101',GETDATE()))

{
  "Id": 5,
  "UserName": "测试",
  "IsOnline": true,
  "LastLogin": 1540635666,
  "_cdc_metadata": {
    "sys_change_operation": "I",
    "sys_change_creation_version": "13",
    "sys_change_version": "13",
    "databaseName": "tgstat_ddztest",
    "schemaName": "dbo",
    "tableName": "test_online"
  }
}

update test_online Set UserName='tom' where UserName='测试'

{
  "Id": 5,
  "UserName": "tom",
  "IsOnline": true,
  "LastLogin": 1540635666,
  "_cdc_metadata": {
    "sys_change_operation": "U",
    "sys_change_creation_version": "0",
    "sys_change_version": "14",
    "databaseName": "tgstat_ddztest",
    "schemaName": "dbo",
    "tableName": "test_online"
  }
}

Delete test_online Where UserName='tom'

null

标签:02,CDCkafka,type,field,各类,table,格式,optional,id
来源: https://blog.csdn.net/sinat_41905822/article/details/118800619