其他分享
首页 > 其他分享> > datax的使用

datax的使用

作者:互联网

正文
在这里插入图片描述

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github。

语法

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Support Data Channels

在这里插入图片描述

MysqlReader

{
	"job": {
		"setting": {
			"speed": {
				"channel": 3
			},
			"errorLimit": {
				"record": 0,
				"percentage": 0.02
			}
		},
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"username": "root",
					"password": "root",
					"column": [
						"id",
						"name"
					],
					"splitPk": "db_id",
					"connection": [{
						"table": [
							"table"
						],
						"jdbcUrl": [
							"jdbc:mysql://127.0.0.1:3306/database"
						]
					}]
				}
			},
			"writer": {
				"name": "streamwriter",
				"parameter": {
					"print": true
				}
			}
		}]
	}
}
{
	"job": {
		"setting": {
			"speed": {
				"channel": 1
			}
		},
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"username": "root",
					"password": "root",
					"connection": [{
						"querySql": ["select db_id,on_line_flag from db_info where db_id < 10;"],
						"jdbcUrl": ["jdbc:mysql://bad_ip:3306/database", "jdbc:mysql://127.0.0.1:bad_port/database", "jdbc:mysql://127.0.0.1:3306/database"]
					}]
				}
			},
			"writer": {
				"name": "streamwriter",
				"parameter": {
					"print": false,
					"encoding": "UTF-8"
				}
			}
		}]
	}
}

jdbcUrl

username

password

table

column

splitPk

where

querySql

类型转换

目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出MysqlReader针对Mysql类型转换列表:
在这里插入图片描述

MysqlWriter

{
	"job": {
		"setting": {
			"speed": {
				"channel": 1
			}
		},
		"content": [{
			"reader": {
				"name": "streamreader",
				"parameter": {
					"column": [{
							"value": "DataX",
							"type": "string"
						},
						{
							"value": 19880808,
							"type": "long"
						},
						{
							"value": "1988-08-08 08:08:08",
							"type": "date"
						},
						{
							"value": true,
							"type": "bool"
						},
						{
							"value": "test",
							"type": "bytes"
						}
					],
					"sliceRecordCount": 1000
				}
			},
			"writer": {
				"name": "mysqlwriter",
				"parameter": {
					"writeMode": "insert",
					"username": "root",
					"password": "root",
					"column": [
						"id",
						"name"
					],
					"session": [
						"set session sql_mode='ANSI'"
					],
					"preSql": [
						"delete from test"
					],
					"connection": [{
						"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
						"table": [
							"test"
						]
					}]
				}
			}
		}]
	}
}

column

类似 MysqlReader ,目前 MysqlWriter 支持大部分 Mysql 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出 MysqlWriter 针对 Mysql 类型转换列表:
在这里插入图片描述

DataX HdfsReader 插件文档

HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为DataX协议的功能。textfile是Hive建表时默认使用的存储格式,数据不做压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于DataX而言,HdfsReader实现上类比TxtFileReader,有诸多相似之处。orcfile,它的全名是Optimized Row Columnar file,是对RCFile做了优化。据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。HdfsReader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前HdfsReader支持的功能如下:

  1. 支持textfile、orcfile、rcfile、sequence file和csv格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
  2. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量。
  3. 支持递归读取、支持正则表达式("*“和”?")。
  4. 支持orcfile数据压缩,目前支持SNAPPY,ZLIB两种压缩方式。
  5. 多个File可以支持并发读取。
  6. 支持sequence file数据压缩,目前支持lzo压缩方式。
  7. csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。
  8. 目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试;
  9. 支持kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效)

暂时不能做到:

  1. 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。
  2. 目前还不支持hdfs HA;
{
	"job": {
		"setting": {
			"speed": {
				"channel": 3
			}
		},
		"content": [{
			"reader": {
				"name": "hdfsreader",
				"parameter": {
					"path": "/user/hive/warehouse/mytable01/*",
					"defaultFS": "hdfs://xxx:port",
					"column": [{
							"index": 0,
							"type": "long"
						},
						{
							"index": 1,
							"type": "boolean"
						},
						{
							"type": "string",
							"value": "hello"
						},
						{
							"index": 2,
							"type": "double"
						}
					],
					"fileType": "orc",
					"encoding": "UTF-8",
					"fieldDelimiter": ","
				}

			},
			"writer": {
				"name": "streamwriter",
				"parameter": {
					"print": true
				}
			}
		}]
	}
}

path

defaultFS

fileType

column

fieldDelimiter

encoding

nullFormat

compress

hadoopConfig

DataX HdfsWriter 插件文档

  1. 目前HdfsWriter仅支持textfile和orcfile两种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表;
  2. 由于HDFS是文件系统,不存在schema的概念,因此不支持对部分列写入;
  3. 目前仅支持与以下Hive数据类型: 数值型:TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE 字符串类型:STRING,VARCHAR,CHAR 布尔类型:BOOLEAN 时间类型:DATE,TIMESTAMP 目前不支持:decimal、binary、arrays、maps、structs、union类型;
  4. 对于Hive分区表目前仅支持一次写入单个分区;
  5. 对于textfile需用户保证写入hdfs文件的分隔符与在Hive上创建表时的分隔符一致,从而实现写入hdfs数据与Hive表字段关联;
  6. HdfsWriter实现过程是:首先根据用户指定的path,创建一个hdfs文件系统上不存在的临时目录,创建规则:path_随机;然后将读取的文件写入这个临时目录;全部写入后再将这个临时目录下的文件移动到用户指定目录(在创建文件时保证文件名不重复); 最后删除临时目录。如果在中间过程发生网络中断等情况造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录。
  7. 目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试;
  8. 目前HdfsWriter支持Kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效)
{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "long"
                            },
                            {
                                "index": 2,
                                "type": "long"
                            },
                            {
                                "index": 3,
                                "type": "long"
                            },
                            {
                                "index": 4,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 5,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 6,
                                "type": "STRING"
                            },
                            {
                                "index": 7,
                                "type": "STRING"
                            },
                            {
                                "index": 8,
                                "type": "STRING"
                            },
                            {
                                "index": 9,
                                "type": "BOOLEAN"
                            },
                            {
                                "index": 10,
                                "type": "date"
                            },
                            {
                                "index": 11,
                                "type": "date"
                            }
                        ],
                        "fieldDelimiter": "\t"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://xxx:port",
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/writerorc.db/orcfull",
                        "fileName": "xxxx",
                        "column": [
                            {
                                "name": "col1",
                                "type": "TINYINT"
                            },
                            {
                                "name": "col2",
                                "type": "SMALLINT"
                            },
                            {
                                "name": "col3",
                                "type": "INT"
                            },
                            {
                                "name": "col4",
                                "type": "BIGINT"
                            },
                            {
                                "name": "col5",
                                "type": "FLOAT"
                            },
                            {
                                "name": "col6",
                                "type": "DOUBLE"
                            },
                            {
                                "name": "col7",
                                "type": "STRING"
                            },
                            {
                                "name": "col8",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col9",
                                "type": "CHAR"
                            },
                            {
                                "name": "col10",
                                "type": "BOOLEAN"
                            },
                            {
                                "name": "col11",
                                "type": "date"
                            },
                            {
                                "name": "col12",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

defaultFS

fileType

path

fileName

column

writeMode

fieldDelimiter

compress

hadoopConfig

encoding

类型转换

目前 HdfsWriter 支持大部分 Hive 类型,请注意检查你的类型。

下面列出 HdfsWriter 针对 Hive 数据类型转换列表:
在这里插入图片描述

DataX安装部署及测试

下载压缩包

下载页面地址:https://github.com/alibaba/DataX

在页面中【Quick Start】—>【Download DataX下载地址】进行下载。下载后的包名:datax.tar.gz。

解压后{datax}目录下有{bin conf job lib log log_perf plugin script tmp}几个目录。

安装

将下载后的压缩包直接解压后可用,前提是对应的java及python环境满足要求。

测试

进入datax目录下的bin中,里面有datax.py文件,可以在cmd中测试:

python D:\datax\bin\datax.py D:\datax\job\job.json

使用即执行一个python脚本,传入json配置文件
配置文件,可以查看模版样例,模版

如果乱码,可以在cmd中输入:

CHCP 65001

使用DataX将mysql数据导入到oracle中

配置json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "****",
                        "password": "****",
                        "column": ["rank","payment"],
                        "connection": [
                            {
                                "table": [
                                    "salary"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://127.0.0.1:3306/test"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "username": "****",
                        "password": "****",
                        "column": [
                            "rank",
                            "payment"
                        ],
                        "preSql": [
                            "delete from oracle_test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521:test",
                                "table": [
                                    "oracle_test"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

cmd执行

python d:\datax\bin\datax.py E:\datax\Mysql2Oracle.json

DataX常用Job样例

hdfs2mysql

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "record": -1,
        "batchSize": 2048
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.019999999552965164
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/user/hive/dataWarehouse/dwd.db/data-integration//ff8080817814aa2901782079d3d5046a/a3e00948d07c424cb2f31c2b46a362ad/part-00000",
            "fieldDelimiter": "\t",
            "encoding": "UTF-8",
            "fileType": "text",
            "column": [
              {
                "index": "0",
                "type": "string"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "12",
                "type": "string"
              },
              {
                "index": "11",
                "type": "string"
              },
              {
                "index": "9",
                "type": "string"
              },
              {
                "index": "7",
                "type": "string"
              },
              {
                "index": "6",
                "type": "string"
              }
            ],
            "defaultFS": "hdfs://master:8020"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "writeMode": "update",
            "preSql": [],
            "session": [],
            "column": [
              "report_error_id",
              "task_info_id",
              "report_error_extend",
              "report_error_des",
              "report_error_content",
              "column_name",
              "table_name"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://hadoop-02:3306/test",
                "table": [
                  "t_inspect_error"
                ]
              }
            ],
            "splitPk": "",
            "password": "Dgms2019!"
          }
        }
      }
    ]
  }
}

mysql2hdfs

{
	"job": {
		"setting": {
			"speed": {
				"channel": 3
			},
			"errorLimit": {
				"record": 0,
				"percentage": 0.02
			}
		},
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"username": "root",
					"password": "123456",
					"column": [
						"name",
						"age"
					],
					"splitPk": "age",
					"connection": [{
						"table": [
							"USER"
						],
						"jdbcUrl": [
							"jdbc:mysql://hadoop-02:3306/test"
						]
					}]
				}
			},
			"writer": {
				"name": "hdfswriter",
				"parameter": {
					"defaultFS": "hdfs://hadoop-01:9000",
					"fileType": "text",
					"path": "/input/mysql2hive",
					"fileName": "1.dat",
					"column": [{
							"name": "name",
							"type": "STRING"
						},
						{
							"name": "name",
							"type": "int"
						}
					],
					"writeMode": "append",
					"fieldDelimiter": "\t",
				}
			}
		}]
	}
}

标签:必选,name,column,datax,使用,默认值,type,DataX
来源: https://blog.csdn.net/Marsin_csdn/article/details/114961704