Friday, 20 February 2015

Debugging Debugging common issue


Configuration Issues

Ø  Using an old version of pig script (0.6 vs 0.7).

Q. How to find out the version ?

A. Pick one of the failed M/R jobs. Look for jobconf variable “java.class.path” to find pig release and check for version on the command line.
Fix: pig –useversion 0.7 …

Ø  Job fails to start  - Too many mappers/reducers

Q: How did we know this ?

A: Find the failed job-id from web-UI and grep for it in the JT logs. Exception tells us the problem
Fix: Set higher block size
          pig –useversion 0.7 –Dmapred.min.split.size=…..
          pig –useversion 0.6 –Dpig.overrideBlockSize=…..

Ø  Mappers/Reducers failing with OOM.

Q: How do we know this is the case?

A: Pick a failed M/R jobs. Look at a failed task stdout/stderr logs to see the stack trace.
Fix: pig –Dmapred.job.(map/reduce).memory.mb=[size in MB]
              -Dmapred.(map/reduce).child.java.opts=“[jvm options]”
Note:-
  1. “*.java.opts” should be at least 200 MB less than “*memory.mb”
  2. Check the cluster config to find out default slot config for “*memory.mb” – (gateway)$ cat $HADOOP_CONF_DIR/mapred-site.xml | grep -A1 "memory.mb”
Giving higher memory might be a temporary solution




 Script Issues

Ø  Using an unsuitable join – FR-Join

Q. How do we know if this is the problem?

A. Mapper/Reducers executing join failing with OOM. Failing job can be correlated to join portion of script using “explain”
Fix: Use regular join. For FR join input data of “right” relation should be less than 150 MB.   

Ø  Failed to specify the number of reducers

Q. What happens in this case?

A. Single reducer running for many hours and getting GBs of data. Job fails sometimes with reducer running OOM.
Fix: Set “default_parallel” in the script. Tuning parallel value for individual M/R boundary operators is recommended

Ø  UDF issues - Bug in java code

Q. What indicates this ?

A. The exception stack trace in the stdout/stderr logs of a failed task   from the last failed job of the script.
Fix. Code inspection and adding System.out debugging statements to nail down the problem. Requires help from user.

Ø  UDF issues - Failing to ship dependent files/jars loaded in UDF

Q. What happens in this case ?

A. The M/R job fails with mappers/reducers throwing an IOException or a FileNotFoundException.
Fix. pig -Dpig.additional.jars=/local/path/to/jar
             -Dmapred.cache.file=hdfs://path/to/file#symlink2
            -Dmapred.create.symlink=yes



Data is corrupt  

Ø  How can this be inferred and validated ?


A.  Very small number (typically 1) of mappers failing leading to job and finally script failure.
Fix. 
            1.  Get the split info of failed mapper from webUI
            2.  Logon to the machine having split
    3.  Grep datanode logs for the failed map attempt-id to get the ‘block-id’ info
    4.  On the machine run fsck to get file information:
                    hadoop fsck <input-path-in-pig-script> -files –blocks | grep –B1 “block-id”
     5.  Run the script with only this path as input to see it fail.

Wednesday, 4 February 2015

Pig Introduction

What is Pig ?

Pig provides an engine for executing the data flows in parallel on Hadoop. It’s include a language, Pig Latin, for expressing these data flows, Pig Latin includes operators  for many of the traditional data operations(joins, filter, sort, etc). as well as the ability for users to develop their own functions for reading,processing,and writing data.

Pig is an Apache open source project. This means users are free to download it as source or binary, use it for themselves, contribute to it, and—under the terms of the Apache License—use it in their products and change it as they see fit.

Pig on Hadoop

Pig runs on Hadoop.it makes use of the both Hadoop Distribute file system,HDFS and Hadoop processing system,MapReduce.

HDFS is a distributed filesystem that stores files across all of the nodes in a Hadoop cluster. It handles breaking the files into large blocks and distributing them across different machines, including making multiple copies of each block so that if any one machine fails no data is lost. It presents a POSIX-like interface to users. By default, Pig reads input files from HDFS, uses HDFS to store intermediate data between MapReduce jobs, and writes its output to HDFS.

MapReduce is a simple but powerful parallel data-processing paradigm. Every job in MapReduce consists of three main phases: map, shuffle, and reduce. In the map phase, the application has the opportunity to operate on each record in the input separately. Many maps are started at once so that while the input may be gigabytes or terabytes in size, given enough machines, the map phase can usually be completed in under one minute.


Part of the specification of a MapReduce job is the key on which data will be collected. For example, if you were processing web server logs for a website that required users to log in, you might choose the user ID to be your key so that you could see everything done by each user on your website. In the shuffle phase, which happens after the map phase, data is collected together by the key the user has chosen and distributed to different machines for the reduce phase. Every record for a given key will go to the same reducer.

Monday, 2 February 2015

Hadoop Core Components



Starting from the bottom of the diagram in Above diagram, Hadoop’s ecosystem consists of the following:

HDFS — A foundational component of the Hadoop ecosystem is the Hadoop Distributed
File System (HDFS). HDFS is the mechanism by which a large amount of data can be
distributed over a cluster of computers, and data is written once, but read many times for
analytics. It provides the foundation for other tools, such as HBase.

MapReduce — Hadoop’s main execution framework is MapReduce, a programming model for distributed, parallel data processing, breaking jobs into mapping phases and reduce phases (thus the name). Developers write MapReduce jobs for Hadoop, using data stored in HDFS for fast data access. Because of the nature of how MapReduce works, Hadoop brings the processing to the data in a parallel fashion, resulting in fast implementation.

HBase — A column-oriented NoSQL database built on top of HDFS, HBase is used for fast
read/write access to large amounts of data. HBase uses Zookeeper for its management to
ensure that all of its components are up and running.

Zookeeper — Zookeeper is Hadoop’s distributed coordination service. Designed to run over a cluster of machines, it is a highly available service used for the management of Hadoop operations, and many components of Hadoop depend on it.

Oozie — A scalable workflow system, Oozie is integrated into the Hadoop stack, and is
used to coordinate execution of multiple MapReduce jobs. It is capable of managing a
significant amount of complexity, basing execution on external events that include timing
and presence of required data.

Pig — An abstraction over the complexity of MapReduce programming, the Pig platform
includes an execution environment and a scripting language (Pig Latin) used to analyze
Hadoop data sets. Its compiler translates Pig Latin into sequences of MapReduce programs.

Hive — An SQL-like, high-level language used to run queries on data stored in Hadoop, Hive enables developers not familiar with MapReduce to write data queries that are translated into MapReduce jobs in Hadoop. Like Pig, Hive was developed as an abstraction layer, but geared more toward database analysts more familiar with SQL than Java programming.
The Hadoop ecosystem also contains several frameworks for integration with the rest of the
enterprise:

Sqoop is a connectivity tool for moving data between relational databases and data
warehouses and Hadoop. Sqoop leverages database to describe the schema for the imported/exported data and MapReduce for parallelization operation and fault tolerance.

Flume is a distributed, reliable, and highly available service for efficiently collecting,
aggregating, and moving large amounts of data from individual machines to HDFS. It
is based on a simple and flexible architecture, and provides a streaming of data flows. It
leverages a simple extensible data model, allowing you to move data from multiple machines within an enterprise into Hadoop.

Sunday, 1 February 2015

Apache Sqoop - Import data from mysql into HDFS

Apache Sqoop


Apache Sqoop is a tool designed for efficiently transferring bulk data in a distributed manner between Apache Hadoop and structured datastores such as relational databases, enterprise data warehouses, and NoSQL systems.  Sqoop can be used to import data into HBase, HDFS and Hive and out of it into RDBMS, in an automated fashion, leveraging Oozie for scheduling.  It has a connector based architecture that supports plugins that provide connectivity to new external systems.

Behind the scenes, the dataset being transferred is split into partitions and map only jobs are launched for each partition with the mappers managing transferring the dataset assigned to it.  Sqoop uses the database metadata to infer the types, and handles the data in a type safe manner.


The following diagrams are from the Apache documentation...


Import process:



Export process:


Supported databases:

Database    version    --direct support?    connect string matches
HSQLDB    1.8.0+        No   jdbc:hsqldb:*//
MySQL    5.0+        Yes   jdbc:mysql://
Oracle    10.2.0+        No    jdbc:oracle:*//
PostgreSQL    8.3+        Yes (import only)    jdbc:postgresql://
 

A) What's covered in this post:

This blog is mostly notes for myself from what I have learned - with examples that can be tried out.

I used the documentation at the link below for my self-study:
http://archive.cloudera.com/cdh4/cdh/4/sqoop-1.4.2-cdh4.2.0/SqoopUserGuide.html

Versions covered:
Sqoop (1.4.2) with Mysql (5.1.69 ) 

Topics covered:
B) MySql database setup and sample data
C) Installing Sqoop
D) Download save mySQl driver 
E) sqoop list commands
F) Importing data into HDFS
    F1. Importing a table into HDFS - basic import
    F2. Executing imports with an options file for static information
    F3. Import all rows of a table in mySQL, but specific columns of the table
    F4. Import all columns, filter rows using where clause
    F5. Import with a free form query without where clause
    F6. Import with a free form query with where clause
    F7. Controlling parallelism and what's with the $CONDITIONS in the where clause?
    F8. Direct connector
    F9. Import formats
    F10. Split by
    F11. Boundary query
    F12. Fetch size
    F13. Compression
    F14. Incremental imports
    F15. Output line formatting options
    F16. Import all tables
G.  Other functionality
    G1. Direct and quick queries or inserts and updates with sqoop eval
    G2. Sqoop code-gen
    G.3. Sqoop merge

Subsequent blogs will cover importing into Hive and HBase, and export out of HDFS into mysql.

My blogs on Sqoop:

Blog 1: Import from mysql into HDFS
Blog 2: Import from mysql into Hive
Blog 3: Export from HDFS and Hive into mysql
Blog 4: Sqoop best practices
Blog 5: Scheduling of Sqoop tasks using Oozie
Blog 6: Sqoop2

Your thoughts/updates:
If you want to share your thoughts/updates, email me at poganhadoop@gmail.com.

B) MySql Setup:

Step 1: Installing MySql and logging onto MySql

1. Install MySql server
sudo yum install mysql-server

2. To start MySql..
sudo service mysqld start

3. Set root password
Run the command below and follow steps:
sudo /usr/bin/mysql_secure_installation

4. Install MySql client
sudo yum install mysql

5. Check what got installed 
sudo rpm -qa|grep -i mysql

Here is the output from my installation:
----------------------------------------------------
mysql-server-5.1.69-1.el6_4.x86_64
mysql-5.1.69-1.el6_4.x86_64
perl-DBD-MySQL-4.013-3.el6.x86_64
mysql-libs-5.1.69-1.el6_4.x86_64
----------------------------------------------------

6. Verify if MySql is running
sudo service mysqld status

7. Configure MySql services to be enabled at boot time..
sudo chkconfig --levels 235 mysqld on

8. Login to MySql
To login as root, from Unix command prompt, enter the command below; Enter password;
mysql -u root -p

9. Create a user
create user 'airawat'@'localhost' identified by 'myPassword';

10. Login as user
mysql -u airawat -p

Step 2: Load sample data

a) Load the sample data available at the link below, following instructions detailed at the site.
http://dev.mysql.com/doc/employee/en/index.html

b) Use the following command to load the employee database, while logged in as root..
mysql -u root -p -t < employees.sql

c) Validate the install by running through the following commands in the MySql shell-

show databases; --Should see employees
use employees; --Use the employees database
show tables;--This lists various tables that can be validated against instructions for the sample data setup

d) Next, still logged in as root, grant the user access to the database.
grant ALL on employees.* to airawat@localhost identified by 'myPassword';

e) Exit out from root and login as the user you set up, and get familiar with the tables
mysql -u airawat -p employees

Now we have the data available to start playing with Sqoop.

C) Installing Sqoop

Refer the Apache Sqoop website for installation.
Sqoop is a client-side tool, so it needs to be installed only on the client on which you wish to use Sqoop.

D) Save mySQl driver 

Download the-
            mysql driver called mysql-connector-java-5.1.25-bin.jar, 
            from http://dev.mysql.com/downloads/connector/j/
            and place it at $SQOOP_HOME/lib on node running sqoop

Note: If you installed CDH, copy to 
/opt/cloudera/parcels/CDH-4.2.0-1.cdh4.2.0.p0.10/lib/sqoop/lib/


E) sqoop list commands

Run the commands on the Unix prompt, on the node where you have sqoop installed.

E.1. List databases

Lists databases in your mysql database.

$ sqoop list-databases --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password myPassword
.
.
.
13/05/31 16:45:58 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
employees
test

E.2. List tables

Lists tables in your mysql database.

$ sqoop list-tables --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password myPassword
.
.
.
13/05/31 16:45:58 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
departments
dept_emp
dept_manager
employees
employees_exp_stg
employees_export
salaries
titles

F) Importing data in MySql into HDFS 

Replace "airawat-mySqlServer-node" with the host name of the node running mySQL server, replace login credentials and target directory.

F1. Importing a table into HDFS - basic import

$ sqoop import \
--connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--table employees \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employees
.
.
.
.9139 KB/sec)
13/05/31 22:32:25 INFO mapreduce.ImportJobBase: Retrieved 300024 records

F2. Executing imports with an options file for static information

Rather than repeat the import command along with connection related input required, each time, you can pass an options file as an argument to sqoop.  Create a text file, as follows, and save it someplace, locally on the node you are running the sqoop client on.  

Note: This blog does not cover it, but you can create and use a password file as well, that you can pass as argument --password-file <<filename>>.

F2.1. Sample Options file:
___________________________________________________________________________
$ vi SqoopImportOptions.txt 
#
#Options file for sqoop import
#

import
--connect
jdbc:mysql://airawat-mySqlServer-node/employees
--username
myUID
--password
myPwd

#
#All other commands should be specified in the command line
___________________________________________________________________________

F2.2. Command to execute import, using an options file:


Note: Provide the proper path for the options file

F2.2.1. The command

$ sqoop --options-file SqoopImportOptions.txt \
--table departments \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/departments
.
.
.
13/05/31 22:48:55 INFO mapreduce.ImportJobBase: Transferred 153 bytes in 26.2453 seconds (5.8296 bytes/sec)
13/05/31 22:48:55 INFO mapreduce.ImportJobBase: Retrieved 9 records.


-m argument is to specify number of mappers.  The department table has a handful of records, so I am setting it to 1.


F2.2.2. Files created in HDFS:

$ hadoop fs -ls -R sqoop-mysql/
drwxr-xr-x   - airawat airawat          0 2013-05-31 22:48 sqoop-mysql/departments
-rw-r--r--   3 airawat airawat          0 2013-05-31 22:48 sqoop-mysql/departments/_SUCCESS
drwxr-xr-x   - airawat airawat          0 2013-05-31 22:48 sqoop-mysql/departments/_logs
drwxr-xr-x   - airawat airawat          0 2013-05-31 22:48 sqoop-mysql/departments/_logs/history
-rw-r--r--   3 airawat airawat      79467 2013-05-31 22:48 sqoop-mysql/departments/_logs/history/cdh-jt01_1369839495962_job_201305290958_0062_conf.xml
-rw-r--r--   3 airawat airawat      12441 2013-05-31 22:48 sqoop-mysql/departments/_logs/history/job_201305290958_0062_1370058514473_ airawat_departments.jar
-rw-r--r--   3 airawat airawat        153 2013-05-31 22:48 sqoop-mysql/departments/part-m-00000


F2.2.3. Data file contents:

$ hadoop fs -cat sqoop-mysql/departments/part-m-00000 | more

d009,Customer Service
d005,Development
d002,Finance
d003,Human Resources
d001,Marketing
d004,Production
d006,Quality Management
d008,Research
d007,Sales

F3. Import all rows of a table in mySQL, but specific columns of the table

Note: This did not work for me..I am merely providing the statement...will try back to see if this  is a defect in my syntax or a sqoop defect..might be related to case of the column names defined in the database versus the sqoop import statement


$ sqoop --options-file SqoopImportOptions.txt \
--table  dept_emp \
--columns “EMP_NO,DEPT_NO,FROM_DATE,TO_DATE” \
--as-textfile \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/DeptEmp

Error: 
13/05/31 23:01:53 ERROR util.SqlTypeMap: It seems like you are looking up a column that does not
13/05/31 23:01:53 ERROR util.SqlTypeMap: exist in the table. Please ensure that you've specified
13/05/31 23:01:53 ERROR util.SqlTypeMap: correct column names in Sqoop options.
13/05/31 23:01:53 ERROR tool.ImportTool: Imported Failed: column not found: ‘EMP_NO'


F4. Import all columns, filter rows using where clause

 $ sqoop --options-file SqoopImportOptions.txt \
--table employees  \
--where "emp_no > 499948\
--as-textfile \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employeeGtTest

F5. Import with a free form query without where clause

 $ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where  $CONDITIONS' \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employeeFrfrmQrySmpl2

(Case of the column needs to match that used to create table, or else the import fails)

F6. Import with a free form query with where clause

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--target-dir /user/airawat/sqoop-mysql/employeeFrfrmQry1

F7. Controlling parallelism and what's with the $CONDITIONS in the where clause?

This section is straight from the Apache User Guide.

$Conditions

If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned bybounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by.

Controlling parallelism
Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism greater than that available within your MapReduce cluster; tasks will run serially and will likely increase the amount of time required to perform the import. Likewise, do not increase the degree of parallism higher than that which your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result.

When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id

Note: Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

F8. Direct connector

By default, the import process will use JDBC.  Performance can be improved by using database specific and native data movement tools. Like for example, MySQL provides the mysqldump tool which can export data from MySQL to other systems very quickly. When we provide the argument, "--direct" we are specifying that Sqoop should attempt the direct import channel. 

Note: Currently, direct mode does not support imports of large object columns.

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--direct \
--target-dir /user/airawat/sqoop-mysql/employeeUsingDirect

F9. Import formats

With mysql, text file is the only format supported;  Avro and Sequence file formatted imports are feasible through other RDBMS - refer Apache Sqoop documentation for more information.

Text file is the default format.

F10. Split by

Refer section on controlling parallelism..

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/SplitByExampleImport

F11. Boundary query

Again related to controlling parallelism..


By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument.
(Note: This did not work for me..got an error;  Need to try again, to see if it is an error on my end, or sqoop related)

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--boundary-query “SELECT MIN(EMP_NO), MAX(EMP_NO) from employees” \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/BoundaryQuerySample


F12. Fetch size

This argument specifies to sqoop the number of entries to read from database at once.
$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fetch-size=50000 \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/FetchSize

F13. Compression

Use the --compress argument to enable compression; If you dont specify a compression codec (--compression-codec), the default gzip will be used.

The command:
$ sqoop --options-file SqoopImportOptions.txt \

--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-z \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/CompressedSample

The output:
$ hadoop fs -ls -R sqoop-mysql/CompressedSample | grep part*

-rw-r--r--   3 airawat airawat     896377 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00000.gz
-rw-r--r--   3 airawat airawat     499564 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00001.gz
-rw-r--r--   3 airawat airawat     409199 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00002.gz
-rw-r--r--   3 airawat airawat     907330 2013-05-31 23:49 sqoop-mysql/CompressedSample/part-m-00003.gz


F14. Incremental imports

F14.1. Prep

Import part of the employee table ahead of time..

The command:

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 15000 AND $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/IncrementalImports

The number of records imported:
$ hadoop fs -ls -R sqoop-mysql/IncrementalImports |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l

4999

F14.2. Run the incremental import

The three arguments highlighted in yellow need to be specified.

Arguments        
--check-column (col): Specifies the column to be examined when determining which rows to import.
--incremental (mode): Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified.
--last-value (value): Specifies the maximum value of the check column from the previous import.

Command

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--check-column EMP_NO \
--incremental append \
--last-value 14999 \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/IncrementalImports

Record count in mysql:

mysql> select count(*) from employees;

+----------+
| count(*) |
+----------+
|   300024 |
+----------+
1 row in set (0.12 sec)

Record count in HDFS:

$ hadoop fs -ls -R sqoop-mysql/IncrementalImports |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l

300024

F15. Output line formatting options


Refer Apache sqoop documentation for more on this topic..

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fields-terminated-by , \
--escaped-by \\ \
--enclosed-by '\"' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/LineFormattingOptions

There is an argument called "--mysql-delimiters".  It uses MySQL’s default delimiter set: fields: , lines: \n escaped-by:\ optionally-enclosed-by: '

F16. Import all tables


Command:
$ sqoop --options-file SqoopImportAllTablesOptions.txt \
--direct \
--warehouse-dir sqoop-mysql/EmployeeDatabase

Options file content:
$ more SqoopImportAllTablesOptions.txt
______________________________________________

#
#Options file for sqoop import
#

import-all-tables
--connect
jdbc:mysql://airawat-mySqlServer-node/employees
--username
myUID
--password
myPWD

#
#All other commands should be specified in the command line
______________________________________________


Files generated:
$ hadoop fs -ls -R sqoop-mysql/EmployeeDatabase/*/part* | awk '{print $8}'


sqoop-mysql/EmployeeDatabase/departments/part-m-00000
sqoop-mysql/EmployeeDatabase/departments/part-m-00001
sqoop-mysql/EmployeeDatabase/departments/part-m-00002
sqoop-mysql/EmployeeDatabase/departments/part-m-00003
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00000
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00001
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00002
sqoop-mysql/EmployeeDatabase/dept_emp/part-m-00003
sqoop-mysql/EmployeeDatabase/dept_manager/part-m-00000
sqoop-mysql/EmployeeDa...........

G1. Direct and quick queries or inserts and updates with sqoop eval

From the Apache Sqoop user guide - "The eval tool allows users to quickly run simple SQL queries against a database; results are printed to the console. This allows users to preview their import queries to ensure they import the data they expect.

G1.1. Query:

$ sqoop eval --connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--query "select * from employees limit 2"

---------------------------------------------------------------------------------
| emp_no      | birth_date | first_name     | last_name        | gender | hire_date  |
---------------------------------------------------------------------------------
| 10001       | 1953-09-02 | Georgi         | Facello          | M | 1986-06-26 |
| 10002       | 1964-06-02 | Bezalel        | Simmel           | F | 1985-11-21 | 

G1.2. Insert:

sqoop eval --connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
-e "insert into employees_export values(550000,'1977-08-08','Mouse','Mickey','M','1999-04-12')"

G2. Sqoop code-gen

The codegen tool generates Java classes which encapsulate and interpret imported records. The Java definition of a record is instantiated as part of the import process, but can also be performed separately. For example, if Java source is lost, it can be recreated. New versions of a class can be created which use different delimiters between fields, and so on.


$ hadoop fs -mkdir sqoop-mysql/jars

$ sqoop codegen --connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--table employees \
--outdir /user/airawat/sqoop-mysql/jars

--Note: Sqoop could not create directory; I created it and tried again, it failed yet again.  Need to look into this.

13/05/31 16:19:24 ERROR orm.CompilationManager: Could not make directory: /user/airawat/sqoop-mysql/jars
13/05/31 16:19:24 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-airawat/compile/c88afaebb5744b4b2e45b6119540834a/employees.jar

G3. Sqoop merge

The merge tool allows you to combine two datasets where entries in one dataset should overwrite entries of an older dataset. For example, an incremental import run in last-modified mode will generate multiple datasets in HDFS where successively newer data appears in each dataset. The merge tool will "flatten" two datasets into one, taking the newest available records for each primary key.

G3.1. Create test data in Mysql

G3.1.1 - Initial dataset

mysql> create table initial_emp as
select emp_no,birth_date, first_name,last_name,gender,hire_date  from employees where emp_no <= 100000
union
select emp_no,birth_date, 'null' as first_name,last_name,gender,hire_date  from employees where emp_no > 100000 and emp_no <= 300000;

--I created a table with fewer records than the employee table, and with some nulls for first name; My next import will include the records from the employee table with emp_no > 100000 and <= 300000, and also records with emp_no > 300000; With the Merge, we should see the final dataset with the full employee table;

G3.1.2 - Incremental dataset

mysql> create table final_emp as
select emp_no,birth_date,first_name,last_name,gender,hire_date  from employees where emp_no > 100000 and emp_no <= 300000
Union
select emp_no,birth_date,first_name,last_name,gender,hire_date  from employees where emp_no > 300000;

G3.2. Import the first dataset into HDFS


$ sqoop --options-file SqoopImportOptions.txt \
--query 'select emp_no,birth_date, first_name,last_name,gender,hire_date  from initial_emp where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/InitialDataSet

hadoop fs -ls -R sqoop-mysql/InitialDataSet |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l

200024

$ hadoop fs -ls -R sqoop-mysql/InitialDataSet |grep part* | awk '{print $8}' | xargs hadoop fs -cat | grep 'null' | wc -l

110024

G3.3. Import the last dataset into HDFS


$ sqoop --options-file SqoopImportOptions.txt \
--query 'select emp_no,birth_date,first_name,last_name,gender, hire_date  from final_emp where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/FinalDataSet


$ hadoop fs -ls -R sqoop-mysql/FinalDataSet |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l

210024

G3.4. Generate jar and class file for employee table


$ sqoop codegen --connect jdbc:mysql://cdh-dev01/employees \
--username myUID \
--password myPWD \
--table employees \
--outdir /user/airawat/sqoop-mysql/jars
.
.
.
13/06/03 10:29:18 ERROR orm.CompilationManager: Could not make directory: /user/airawat/sqoop-mysql/jars
13/06/03 10:29:18 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/employees.jar

Files created:
$ ls /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/
employees.class  employees.jar  employees.java

Copy files to your home directory:
cp /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/* .

G3.4. Execute the merge


$ sqoop merge --new-data sqoop-mysql/FinalDataSet --onto sqoop-mysql/InitialDataSet --target-dir sqoop-mysql/MergedDataSet \
    --jar-file employees.jar --class-name employees --merge-key emp_no

Note: If I tried running this command with “EMP_NO” as merge-key instead of “emp-no”, I got errors.

$ hadoop fs -ls -R sqoop-mysql/MergedDataSet |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l

300024

We have the expected number of records - 300024; Now lets check if any have 'null' in them...they should not.

$ hadoop fs -ls -R sqoop-mysql/MergedDataSet |grep part* | awk '{print $8}' | xargs hadoop fs -cat | grep 'null' | wc -l

0

A quick look at the data...

$ hadoop fs -cat sqoop-mysql/MergedDataSet/part-r-00000 | more

100000,1956-01-11,Hiroyasu,Emden,M,1991-07-02
100002,1957-03-04,Claudi,Kolinko,F,1988-02-20
100004,1960-04-16,Avishai,Nitsch,M,1986-01-03
100006,1956-07-13,Janalee,Himler,F,1986-01-15
100008,1953-05-14,Otmar,Selvestrel,M,1987-05-05
100011,1956-07-20,Shmuel,Birge,M,1989-11-23

Looks good.

H. Other database specific sqoop arguments/functionality

Available at:
http://archive.cloudera.com/cdh4/cdh/4/sqoop-1.4.2-cdh4.2.0/SqoopUserGuide.html#_compatibility_notes


That's it for this post.  In my next post, I will cover Hive.  Subsequent posts will cover exports, integration with Oozie, and finally integration with HBase.  Hope this blog has been helpful.