e) Exit out from root and login as the user you set up, and get familiar with the tables
mysql -u airawat -p employees
Now we have the data available to start playing with Sqoop.
C) Installing Sqoop
Refer the Apache Sqoop website for installation.
Sqoop is a client-side tool, so it needs to be installed only on the client on which you wish to use Sqoop.
D) Save mySQl driver
Download the-
mysql driver called mysql-connector-java-5.1.25-bin.jar,
from http://dev.mysql.com/downloads/connector/j/
and place it at $SQOOP_HOME/lib on node running sqoop
Note: If you installed CDH, copy to
/opt/cloudera/parcels/CDH-4.2.0-1.cdh4.2.0.p0.10/lib/sqoop/lib/
E) sqoop list commands
Run the commands on the Unix prompt, on the node where you have sqoop installed.
E.1. List databases
Lists databases in your mysql database.
$ sqoop list-databases --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password myPassword
.
.
.
13/05/31 16:45:58 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
employees
test
E.2. List tables
Lists tables in your mysql database.
$ sqoop list-tables --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password myPassword
.
.
.
13/05/31 16:45:58 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
departments
dept_emp
dept_manager
employees
employees_exp_stg
employees_export
salaries
titles
F) Importing data in MySql into HDFS
Replace "airawat-mySqlServer-node" with the host name of the node running mySQL server, replace login credentials and target directory.
F1. Importing a table into HDFS - basic import
$ sqoop import \
--connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--table employees \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employees
.
.
.
.9139 KB/sec)
13/05/31 22:32:25 INFO mapreduce.ImportJobBase: Retrieved 300024 records
F2. Executing imports with an options file for static information
Rather than repeat the import command along with connection related input required, each time, you can pass an options file as an argument to sqoop. Create a text file, as follows, and save it someplace, locally on the node you are running the sqoop client on.
Note: This blog does not cover it, but you can create and use a password file as well, that you can pass as argument --password-file <<filename>>.
F2.1. Sample Options file:
___________________________________________________________________________
$ vi SqoopImportOptions.txt
#
#Options file for sqoop import
#
import
--connect
jdbc:mysql://airawat-mySqlServer-node/employees
--username
myUID
--password
myPwd
#
#All other commands should be specified in the command line
___________________________________________________________________________
F2.2. Command to execute import, using an options file:
Note: Provide the proper path for the options file
F2.2.1. The command
$ sqoop --options-file SqoopImportOptions.txt \
--table departments \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/departments
.
.
.
13/05/31 22:48:55 INFO mapreduce.ImportJobBase: Transferred 153 bytes in 26.2453 seconds (5.8296 bytes/sec)
13/05/31 22:48:55 INFO mapreduce.ImportJobBase: Retrieved 9 records.
-m argument is to specify number of mappers. The department table has a handful of records, so I am setting it to 1.
F2.2.2. Files created in HDFS:
$ hadoop fs -ls -R sqoop-mysql/
drwxr-xr-x - airawat airawat 0 2013-05-31 22:48 sqoop-mysql/departments
-rw-r--r-- 3 airawat airawat 0 2013-05-31 22:48 sqoop-mysql/departments/_SUCCESS
drwxr-xr-x - airawat airawat 0 2013-05-31 22:48 sqoop-mysql/departments/_logs
drwxr-xr-x - airawat airawat 0 2013-05-31 22:48 sqoop-mysql/departments/_logs/history
-rw-r--r-- 3 airawat airawat 79467 2013-05-31 22:48 sqoop-mysql/departments/_logs/history/cdh-jt01_1369839495962_job_201305290958_0062_conf.xml
-rw-r--r-- 3 airawat airawat 12441 2013-05-31 22:48 sqoop-mysql/departments/_logs/history/job_201305290958_0062_1370058514473_ airawat_departments.jar
-rw-r--r-- 3 airawat airawat 153 2013-05-31 22:48 sqoop-mysql/departments/part-m-00000
F2.2.3. Data file contents:
$ hadoop fs -cat sqoop-mysql/departments/part-m-00000 | more
d009,Customer Service
d005,Development
d002,Finance
d003,Human Resources
d001,Marketing
d004,Production
d006,Quality Management
d008,Research
d007,Sales
F3. Import all rows of a table in mySQL, but specific columns of the table
Note: This did not work for me..I am merely providing the statement...will try back to see if this is a defect in my syntax or a sqoop defect..might be related to case of the column names defined in the database versus the sqoop import statement
$ sqoop --options-file SqoopImportOptions.txt \
--table dept_emp \
--columns “EMP_NO,DEPT_NO,FROM_DATE,TO_DATE” \
--as-textfile \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/DeptEmp
Error:
13/05/31 23:01:53 ERROR util.SqlTypeMap: It seems like you are looking up a column that does not
13/05/31 23:01:53 ERROR util.SqlTypeMap: exist in the table. Please ensure that you've specified
13/05/31 23:01:53 ERROR util.SqlTypeMap: correct column names in Sqoop options.
13/05/31 23:01:53 ERROR tool.ImportTool: Imported Failed: column not found: ‘EMP_NO'
F4. Import all columns, filter rows using where clause
$ sqoop --options-file SqoopImportOptions.txt \
--table employees \
--where "emp_no > 499948" \
--as-textfile \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employeeGtTest
F5. Import with a free form query without where clause
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employeeFrfrmQrySmpl2
(Case of the column needs to match that used to create table, or else the import fails)
F6. Import with a free form query with where clause
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employeeFrfrmQry1
F7. Controlling parallelism and what's with the $CONDITIONS in the where clause?
This section is straight from the Apache User Guide.
$Conditions
If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned bybounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS
which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by
.
Controlling parallelism
Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m
or --num-mappers
argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism greater than that available within your MapReduce cluster; tasks will run serially and will likely increase the amount of time required to perform the import. Likewise, do not increase the degree of parallism higher than that which your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result.
When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id
whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi
, with (lo, hi)
set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.
If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by
argument. For example, --split-by employee_id
.
Note: Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.
F8. Direct connector
By default, the import process will use JDBC. Performance can be improved by using database specific and native data movement tools. Like for example, MySQL provides the mysqldump tool which can export data from MySQL to other systems very quickly. When we provide the argument, "--direct" we are specifying that Sqoop should attempt the direct import channel.
Note: Currently, direct mode does not support imports of large object columns.
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--direct \
--target-dir /user/airawat/sqoop-mysql/employeeUsingDirect
F9. Import formats
With mysql, text file is the only format supported; Avro and Sequence file formatted imports are feasible through other RDBMS - refer Apache Sqoop documentation for more information.
Text file is the default format.
F10. Split by
Refer section on controlling parallelism..
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/SplitByExampleImport
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--boundary-query “SELECT MIN(EMP_NO), MAX(EMP_NO) from employees” \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/BoundaryQuerySample
F12. Fetch size
This argument specifies to sqoop the number of entries to read from database at once.
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fetch-size=50000 \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/FetchSize
F13. Compression
Use the --compress argument to enable compression; If you dont specify a compression codec (--compression-codec), the default gzip will be used.
The command:
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-z \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/CompressedSample
The output:
$ hadoop fs -ls -R sqoop-mysql/CompressedSample | grep part*
-rw-r--r-- 3 airawat airawat 896377 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00000.gz
-rw-r--r-- 3 airawat airawat 499564 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00001.gz
-rw-r--r-- 3 airawat airawat 409199 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00002.gz
-rw-r--r-- 3 airawat airawat 907330 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00003.gz
F14. Incremental imports
F14.1. Prep
Import part of the employee table ahead of time..
The command:
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 15000 AND $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/IncrementalImports
The number of records imported:
$ hadoop fs -ls -R sqoop-mysql/IncrementalImports |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
4999
F14.2. Run the incremental import
The three arguments highlighted in yellow need to be specified.
Arguments
--check-column (col): Specifies the column to be examined when determining which rows to import.
--incremental (mode): Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified.
--last-value (value): Specifies the maximum value of the check column from the previous import.
Command
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--check-column EMP_NO \
--incremental append \
--last-value 14999 \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/IncrementalImports
Record count in mysql:
mysql> select count(*) from employees;
+----------+
| count(*) |
+----------+
| 300024 |
+----------+
1 row in set (0.12 sec)
Record count in HDFS:
$ hadoop fs -ls -R sqoop-mysql/IncrementalImports |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
300024
F15. Output line formatting options
Refer Apache sqoop documentation for more on this topic..
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fields-terminated-by , \
--escaped-by \\ \
--enclosed-by '\"' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/LineFormattingOptions
There is an argument called "--mysql-delimiters". It uses MySQL’s default delimiter set: fields: , lines: \n escaped-by:\ optionally-enclosed-by: '
F16. Import all tables
Command:
$ sqoop --options-file SqoopImportAllTablesOptions.txt \
--direct \
--warehouse-dir sqoop-mysql/EmployeeDatabase
Options file content:
$ more SqoopImportAllTablesOptions.txt
______________________________________________
#
#Options file for sqoop import
#
import-all-tables
--connect
jdbc:mysql://airawat-mySqlServer-node/employees
--username
myUID
--password
myPWD
#
#All other commands should be specified in the command line
______________________________________________
Files generated:
$ hadoop fs -ls -R sqoop-mysql/EmployeeDatabase/*/part* | awk '{print $8}'
sqoop-mysql/EmployeeDatabase/departments/part-m-00000
sqoop-mysql/EmployeeDatabase/departments/part-m-00001
sqoop-mysql/EmployeeDatabase/departments/part-m-00002
sqoop-mysql/EmployeeDatabase/departments/part-m-00003
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00000
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00001
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00002
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00003
sqoop-mysql/EmployeeDatabase/dept_manager/part-m-00000
sqoop-mysql/EmployeeDa...........
G1. Direct and quick queries or inserts and updates with sqoop eval
From the Apache Sqoop user guide - "The eval tool allows users to quickly run simple SQL queries against a database; results are printed to the console. This allows users to preview their import queries to ensure they import the data they expect.
G1.1. Query:
$ sqoop eval --connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--query "select * from employees limit 2"
---------------------------------------------------------------------------------
| emp_no | birth_date | first_name | last_name | gender | hire_date |
---------------------------------------------------------------------------------
| 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 |
| 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 |
G1.2. Insert:
sqoop eval --connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
-e "insert into employees_export values(550000,'1977-08-08','Mouse','Mickey','M','1999-04-12')"
G2. Sqoop code-gen
The codegen tool generates Java classes which encapsulate and interpret imported records. The Java definition of a record is instantiated as part of the import process, but can also be performed separately. For example, if Java source is lost, it can be recreated. New versions of a class can be created which use different delimiters between fields, and so on.
$ hadoop fs -mkdir sqoop-mysql/jars
$ sqoop codegen --connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--table employees \
--outdir /user/airawat/sqoop-mysql/jars
--Note: Sqoop could not create directory; I created it and tried again, it failed yet again. Need to look into this.
13/05/31 16:19:24 ERROR orm.CompilationManager: Could not make directory: /user/airawat/sqoop-mysql/jars
13/05/31 16:19:24 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-airawat/compile/c88afaebb5744b4b2e45b6119540834a/employees.jar
G3. Sqoop merge
The merge tool allows you to combine two datasets where entries in one dataset should overwrite entries of an older dataset. For example, an incremental import run in last-modified mode will generate multiple datasets in HDFS where successively newer data appears in each dataset. The merge tool will "flatten" two datasets into one, taking the newest available records for each primary key.
G3.1. Create test data in Mysql
G3.1.1 - Initial dataset
mysql> create table initial_emp as
select emp_no,birth_date, first_name,last_name,gender,hire_date from employees where emp_no <= 100000
union
select emp_no,birth_date, 'null' as first_name,last_name,gender,hire_date from employees where emp_no > 100000 and emp_no <= 300000;
--I created a table with fewer records than the employee table, and with some nulls for first name; My next import will include the records from the employee table with emp_no > 100000 and <= 300000, and also records with emp_no > 300000; With the Merge, we should see the final dataset with the full employee table;
G3.1.2 - Incremental dataset
mysql> create table final_emp as
select emp_no,birth_date,first_name,last_name,gender,hire_date from employees where emp_no > 100000 and emp_no <= 300000
Union
select emp_no,birth_date,first_name,last_name,gender,hire_date from employees where emp_no > 300000;
G3.2. Import the first dataset into HDFS
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select emp_no,birth_date, first_name,last_name,gender,hire_date from initial_emp where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/InitialDataSet
hadoop fs -ls -R sqoop-mysql/InitialDataSet |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
200024
$ hadoop fs -ls -R sqoop-mysql/InitialDataSet |grep part* | awk '{print $8}' | xargs hadoop fs -cat | grep 'null' | wc -l
110024
G3.3. Import the last dataset into HDFS
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select emp_no,birth_date,first_name,last_name,gender, hire_date from final_emp where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/FinalDataSet
$ hadoop fs -ls -R sqoop-mysql/FinalDataSet |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
210024
G3.4. Generate jar and class file for employee table
$ sqoop codegen --connect jdbc:mysql://cdh-dev01/employees \
--username myUID \
--password myPWD \
--table employees \
--outdir /user/airawat/sqoop-mysql/jars
.
.
.
13/06/03 10:29:18 ERROR orm.CompilationManager: Could not make directory: /user/airawat/sqoop-mysql/jars
13/06/03 10:29:18 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/employees.jar
Files created:
$ ls /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/
employees.class employees.jar employees.java
Copy files to your home directory:
cp /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/* .
G3.4. Execute the merge
$ sqoop merge --new-data sqoop-mysql/FinalDataSet --onto sqoop-mysql/InitialDataSet --target-dir sqoop-mysql/MergedDataSet \
--jar-file employees.jar --class-name employees --merge-key emp_no
Note: If I tried running this command with “EMP_NO” as merge-key instead of “emp-no”, I got errors.
$ hadoop fs -ls -R sqoop-mysql/MergedDataSet |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
300024
We have the expected number of records - 300024; Now lets check if any have 'null' in them...they should not.
$ hadoop fs -ls -R sqoop-mysql/MergedDataSet |grep part* | awk '{print $8}' | xargs hadoop fs -cat | grep 'null' | wc -l
0
A quick look at the data...
$ hadoop fs -cat sqoop-mysql/MergedDataSet/part-r-00000 | more
100000,1956-01-11,Hiroyasu,Emden,M,1991-07-02
100002,1957-03-04,Claudi,Kolinko,F,1988-02-20
100004,1960-04-16,Avishai,Nitsch,M,1986-01-03
100006,1956-07-13,Janalee,Himler,F,1986-01-15
100008,1953-05-14,Otmar,Selvestrel,M,1987-05-05
100011,1956-07-20,Shmuel,Birge,M,1989-11-23
Looks good.
H. Other database specific sqoop arguments/functionality
Available at:
http://archive.cloudera.com/cdh4/cdh/4/sqoop-1.4.2-cdh4.2.0/SqoopUserGuide.html#_compatibility_notes
That's it for this post. In my next post, I will cover Hive. Subsequent posts will cover exports, integration with Oozie, and finally integration with HBase. Hope this blog has been helpful.