Monday, 8 July 2019

HIVE FILE-FORMAT


Apache Hive

Apache Hive is an open source data warehouse software that facilitates querying and managing of large datasets residing in distributed storage. Hive provides a language called HiveQL(handles structured data only) which allows users to query and is similar to SQL.
By default, Hive has derby database to store the data in it. Data is eventually stored in files. There are some specific file formats which Hive can handle such as:
  • TEXTFILE
  • SEQUENCEFILE
  • RCFILE
  • ORCFILE

File Format
file format is a way in which information is stored or encoded in a computer file. In Hive it refers to how records are stored inside the file. As we are dealing with structured data, each record has to be its own structure. How records are encoded in a file defines a file format.
These file formats mainly vary between data encoding, compression rate, usage of space and disk I/O.
Hive does not verify whether the data that you are loading matches the schema for the table or not. However, it verifies if the file format matches the table definition or not.

TEXTFILE
TEXTFILE format is a famous input/output format used in Hadoop. In Hive if we define a table as TEXTFILE it can load data of from CSV (Comma Separated Values), delimited by Tabs, Spaces, and JSON data. This means fields in each record should be separated by comma or space or tab or it may be JSON(JavaScript Object Notation) data.
By default, if we use TEXTFILE format then each line is considered as a record.
Storing in TEXTFILE
Creation of table in Hive which will store data in TEXTFILE
/*---------------------------------------------------------------------------------------------------------------------*/
hive> CREATE TABLE IF NOT EXISTS employee ( eid int, name String, salary String, destination String)
/*-----The following data is a Comment, Row formatted fields such as Field terminator, Lines terminator, and Stored File type. --------*/
COMMENT 'Employee details'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
/*---------------------------------------------------------------------------------------------------------------------*/
Load Data Statement
Generally, after creating a table in SQL, we can insert data using the Insert statement. But in Hive, we can insert data using the LOAD DATA statement.
While inserting data into Hive, it is better to use LOAD DATA to store bulk records. There are two ways to load data: one is from local file system and second is from Hadoop file system.
Syntax for loading data is as follows:
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename;
//LOCAL is identifier to specify the local path. It is optional.
//OVERWRITE is optional to overwrite the data in the table.
Example:
hive> LOAD DATA LOCAL INPATH '/home/user/sample.txt' OVERWRITE INTO TABLE employee;

SEQUENCEFILE
We know that Hadoop’s performance is drawn out when we work with a small number of files with big size rather than a large number of files with small size. If the size of a file is smaller than the typical block size in Hadoop, we consider it as a small file. Due to this, a number of metadata increases which will become an overhead to the NameNode. To solve this problem sequence files are introduced in Hadoop. Sequence files act as a container to store the small files.
Sequence files are flat files consisting of binary key-value pairs. When Hive converts queries to MapReduce jobs, it decides on the appropriate key-value pairs to be used for a given record. Sequence files are in the binary format which can be split and the main use of these files is to club two or more smaller files and make them as a one sequence file.
In Hive we can create a sequence file by specifying STORED AS SEQUENCEFILE in the end of a CREATE TABLE statement.
There are three types of sequence files:
• Uncompressed key/value records.
• Record compressed key/value records – only 'values' are compressed here
• Block compressed key/value records – both keys and values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.

Hive has its own SEQUENCEFILE reader and SEQUENCEFILE writer libraries for reading and writing through sequence files.

Storing in SEQUENCEFILE
/*---------------------------------------------------------------------------------------------------------------------*/
create table olympic_sequencefile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT)
/*-----The following data is a Comment, Row formatted fields such as Field terminator, Lines terminator, and Stored File type. --------*/
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS SEQUENCEFILE
/*---------------------------------------------------------------------------------------------------------------------*/
So to load the data into SEQUENCEFILE we need to use the following approach:
INSERT OVERWRITE TABLE olympic_sequencefile
SELECT * FROM olympic;
Load Data from file common for all:
#To overwrite the data in the table use -
LOAD DATA INPATH '/home/hadoop/data/ olympicseq' OVERWRITE INTO TABLE olympic_sequencefile;
#To append the data in the table use -
LOAD DATA INPATH '/home/hadoop/data/ olympicseq' INTO TABLE olympic_sequencefile;

RCFILE
RCFILE stands of Record Columnar File which is another type of binary file format which offers high compression rate on the top of the rows.
RCFILE is used when we want to perform operations on multiple rows at a time.
RCFILEs are flat files consisting of binary key/value pairs, which shares many similarities with SEQUENCEFILE. RCFILE stores columns of a table in form of record in a columnar manner. It first partitions rows horizontally into row splits and then it vertically partitions each row split in a columnar way. RCFILE first stores the metadata of a row split, as the key part of a record, and all the data of a row split as the value part. This means that RCFILE encourages column oriented storage rather than row oriented storage.
This column oriented storage is very useful while performing analytics. It is easy to perform analytics when we have a column oriented storage type.
Storing in RCFILE
/*---------------------------------------------------------------------------------------------------------------------*/
create table olympic_sequencefile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT)
/*-----The following data is a Comment, Row formatted fields such as Field terminator, Lines terminator, and Stored File type. --------*/
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS RCFILE
/*---------------------------------------------------------------------------------------------------------------------*/

ORCFILE
ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats. ORC reduces the size of the original data up to 75%(eg: 100GB file will become 25GB). As a result the speed of data processing also increases. ORC shows better performance than Text, Sequence and RC file formats.
An ORC file contains rows data in groups called as Stripes along with a file footer. ORC format improves the performance when Hive is processing the data.
Storing in ORCFILE
/*---------------------------------------------------------------------------------------------------------------------*/
create table olympic_sequencefile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT)
/*-----The following data is a Comment, Row formatted fields such as Field terminator, Lines terminator, and Stored File type. --------*/
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS ORC
/*---------------------------------------------------------------------------------------------------------------------*/

Saturday, 15 June 2019

Big-Data File-Formats

Big-Data File-Formats

Evaluation Framework: Row vs. Column
At the highest level, column-based storage is most useful when performing analytics queries that require only a subset of columns examined over very large data sets.
If your queries require access to all or most of the columns of each row of data, row-based storage will be better suited to your needs.
To help illustrate the differences between row and column-based data, consider this table of basic transaction data. For each transaction, we have the customer name, the product ID, sale amount, and the date.

Row-based storage is the simplest form of data table and is used in many applications, from web log files to highly-structured database systems like MySql and Oracle.
In a database, this data would be stored by row, as follows: Emma,Prod1,100.00,2018-04-02;Liam,Prod2,79.99,2018- 04-02;Noah,Prod3,19.99,2018-04-01;Oliv  ia,Prod2,79.99,2018-04-03

Column-based data formats, as you might imagine, store data by column. Using our transaction data as an example, in a columnar database this data would be stored as follows: Emma,Liam,Noah,Olivia;Prod1,Prod2,Prod3;Pr od2;100.00,79.99,19.99,79.99;2018-04-02,2018-04-02, 2018-04-01, 2018-04-03

Evaluation Framework: Schema Evolution
When we talk about “schema” in a database context, we are really talking about its organization—the tables, columns, views, primary keys, relationships, etc. When we talk about schemas in the context of an individual dataset or data file, it’s helpful to simplify schema further to the individual attribute level (column headers in the simplest use case). The schema will store the definition of each attribute and its type. Unless your data is guaranteed to never change, you’ll need to think about schema evolution, or how your data schema changes over time. How will your file format manage fields that are added or deleted?
One of the most important considerations when selecting a data format is how it manages schema evolution. When evaluating schema evolution specifically, there are a few key questions to ask of any data format:
  •          How easy is it to update a schema (such as adding a field, removing or renaming a field)?
  •          How will different versions of the schema “talk” to each other?
  •          Is it human-readable? Does it need to be?
  •          How fast can the schema be processed?
  •          How does it impact the size of data?
Evaluation Framework: Splitability
Datasets are commonly composed of hundreds to thousands of files, each of which may contain thousands to millions of records or more. Furthermore, these file-based chunks of data are often being generated continuously. Processing such datasets efficiently usually requires breaking the job up into parts that can be farmed out to separate processors. In fact, large-scale parallelization of processing is key to performance. Your choice of file format can critically affect the ease with which this parallelization can be implemented.
Row-based formats, such as Avro, can be split along row boundaries, as long as the processing can proceed with one record at a time. If groups of records related by some particular column value are required for processing, out-of-the box partitioning may be more challenging for row-based data stored in random order.
A column-based format will be more amenable to splitting into separate jobs if the query calculation is concerned with a single column at a time. The columnar formats we discuss in this paper are row-columnar, which means they take a batch of rows and store that batch in columnar format. These batches then become split boundaries.

Evaluation Framework: Compression
Data compression reduces the amount of information needed for the storage or transmission of a given set of data. It reduces the resources required to store and transmit data, typically saving time and money. Compression uses encoding for frequently repeating data to achieve this reduction, done at the source of the data before it is stored and/or transmitted. Simply reducing the size of a data file can be referred to as data compression.
Columnar data can achieve better compression rates than row-based data. Storing values by column, with the same type next to each other, allows you to do more efficient compression on them than if you’re storing rows of data. For example, storing all dates together in memory allows for more efficient compression than storing data of various types next to each other—such as string, number, date, string, date.
While compression may save on storage costs, it is important to also consider compute costs and resources. Chances are, at some point you will want to decompress that data for use in another application. Decompression is not free—it incurs compute costs. If and how you compress the data will be a function of how you want to optimize the compute costs vs. storage costs for your given use case.

APACHE AVRO: A ROW BASED FORMAT
Apache Avro was released by the Hadoop working group in 2009. It is a rowbased format that is highly splittable. The innovative, key feature of Avro is that the schema travels with data. The data definition is stored in JSON format while the data is stored in binary format, minimizing file size and maximizing efficiency. Avro features robust support for schema evolution by managing added fields, missing fields, and fields that have changed. This allows old software to read the new data and new software to read the old data—a critical feature if your data has the potential to change.
We understand this intuitively—as soon as you’ve finished what you’re sure is the master schema to end all schemas, someone will come up with a new use case and request to add a field. This is especially true for big, distributed systems in large corporations. With Avro’s capacity to manage schema evolution, it’s possible to update components independently, at different times, with low risk of incompatibility. This saves applications from having to write if-else statements to process different schema versions, and saves the developer from having to look at old code to understand old schemas. Because all versions of the schema are stored in a human-readable JSON header, it’s easy to understand all the fields that you have available.
Avro can support many different programming languages. Because the schema is stored in JSON while the data is in binary, Avro is a relatively compact option for both persistent data storage and wire transfer. Avro is typically the format of choice for write-heavy workloads given its easy to append new rows.

APACHE PARQUET: A COLUMN BASED FORMAT
Parquet was developed by Cloudera and Twitter (and inspired by Google’s Dremel query system) to serve as an optimized columnar data store on Hadoop. Because data is stored by columns, it can be highly compressed and splittable (for the reasons noted above). Parquet is commonly used with Apache Impala, an analytics database for Hadoop. Impala is designed for low latency and high concurrency queries on Hadoop.
 The column metadata for a Parquet file is stored at the end of the file, which allows for fast, one-pass writing. Metadata can include information such as, data types, compression/encoding scheme used (if any), statistics, element names, and more.
Parquet is especially adept at analyzing wide datasets with many columns. Each Parquet file contains binary data organized by “row group.” For each row group, the data values are organized by column. This enables the compression benefits that we described above. Parquet is a good choice for read-heavy workloads.
Generally, schema evolution in the Parquet file type is not an issue and is supported. However, not all systems that prefer Parquet support schema evolution optimally. For example, consider a columnar store like Impala. It is hard for that data store to support schema evolution, as the database needs to have two versions of schema (old and new) for a table.

APACHE ORC: A ROW-COLUMNAR BASED FORMAT
Optimized Row Columnar (ORC) format was first developed at Hortonworks to optimize storage and performance in Hive, a data warehouse for summarization, query and analysis that lives on top of Hadoop. Hive is designed for queries and analysis, and uses the query language HiveQL (similar to SQL).
ORC files are designed for high performance when Hive is reading, writing, and processing data. ORC stores row data in columnar format. This row-columnar format is highly efficient for compression and storage. It allows for parallel processing across a cluster, and the columnar format allows for skipping of unneeded columns for faster processing and decompression. ORC files can store data more efficiently without compression than compressed text files. Like Parquet, ORC is a good option for read-heavy workloads.
This advanced level of compression is possible because of its index system. ORC files contain “stripes” of data, or 10,000 rows. These stripes are the data building blocks and independent of each other, which means queries can skip to the stripe that is needed for any given query. Within each stripe, the reader can focus only on the columns required. The footer file includes descriptive statistics for each column within a stripe such as count, sum, min, max, and if null values are present.
ORC is designed to maximize storage and query efficiency. According to the Apache Foundation, “Facebook uses ORC to save tens of petabytes in their data warehouse and demonstrated that ORC is significantly faster than RC File or Parquet.”
Similar to Parquet, schema evolution is supported by the ORC file format, but its efficacy is dependent on what the data store supports. Recent advances have been made in Hive that allow for appending columns, type conversion, and name mapping.

Comparison between AVRO, ORC & PARQUET file

Tuesday, 28 May 2019

Hadoop Ecosystem

Big Data

Big data is the term for a collection of datasets so large and complex that it becomes difficult to  process using on-hand database management tools or traditional data processing applications.
       V’s of Big Data: Volume, Variety, Velocity & Veracity

Hadoop ecosystem :

Hadoop Common: The common utilities that support the other Hadoop subprojects.
HDFS: A distributed file system that provides high throughput access to application data.
MapReduce: A software framework for distributed processing of large data sets on compute clusters.
Other Hadoop-related projects at Apache include:
Avro: A data serialization system.
Cassandra: A scalable multi-master database with no single points of failure.
HBase: A scalable, distributed database that supports structured data storage for large tables.
Hive: A data warehouse infrastructure that provides data summarization and ad hoc querying.
Mahout: A Scalable machine learning and data mining library.
Pig: A high-level data-flow language and execution framework for parallel computation.
ZooKeeper: A high-performance coordination service for distributed applications
Hadoop Ecosystem :
Master Services :
1.  NameNode : 
     NameNode is the main and heartbeat node of Hdfs and also called master. It stores the meta data in RAM for quick access and track the files across hadoop cluster. If Namenode failure the whole hdfs is inaccessible so NameNode is very critical for HDFS. NameNode is the health of datanode and it access datanode data only. NameNode Tracking all information from files such as which file saved in cluster, access time of file and Which user access a file on current time. FsImage contains the complete state of the file system namespace since the start of the NameNode. Editlogs contains all the recent modifications made to the file system with respect to the most recent FsImage.

Secondary Namenode : 
Secondary NameNode helps to Primary NameNode and merge the namespaces. Secondary NameNode stores the data when NameNode failure and used to restart the NameNode. It requires huge amount of memory for data storing. Secondary NameNode runs on different machines for memory management. Secondary NameNode is checking point of NameNode.
1.    Checkpoint Node(Not come under master services) :
      Checkpoint Node mainly designed for solves the NameNode drawbacks. It tracks the latest checkpoint directory that has same structure. It creates the checking point for NameNode namespace and downloads the edits and fsimage from NameNode and mering locally. The new image are uploaded to NameNode. After this uploads the result to NameNode.
2.    Backup Node(Not come under master services) : 
     It provide the functions to Check Point but it interact with NameNode and it supports the online streaming of filesystems. In Backup Nodes namespace are available on main memory because it interact with primary node. It maintains up-to date file namespace for streaming process. Backup node having own memory so just save and copy the namespace from main memory.
3.    JobTracker : 
     Job Tracker node used to MapReduce jobs and it process runs on separate node. It receives the request for MapReduce from client. It Loacte the data when after talking of NameNode. Job tracked choose best TaskTracker for executes the task and it give slots to tasktracker for execute the task. It monitors the all TaskTrackers and give status report to Client. If Job Tracked failure MapReduce function does not executed and all functions are halted so Job Tracker is critical for MapReduce.
      Slave Services :
1.  DataNode : 
     DataNode stores actual data of HDFS and also called Slave. If DataNode failure it does not affect any data which stored in DataNode. It Configured lot of disk space because DataNode stores actually data. It perform read and write operations as per client request. Performance of DataNode are based on NameNode Instuctions.
2.  TaskTrackerNode : 
     Task Tracker are runs on DataNode. TaskTrackers will be assigned Mapper and Reducer tasks to execute by JobTracker. If Task Tracker failure the job tracker assign the task for another node so MapReduce Task running successfully..
Blocks are the nothing but the smallest continuous location on your hard drive where data is stored. The default size of each block is 128 MB in Apache Hadoop 2.x (64 MB in Apache Hadoop 1.x)
Replication Management: HDFS provides a reliable way to store huge data in a distributed environment as data blocks. The blocks are also replicated to provide fault tolerance. The default replication factor is 3 which is again configurable. So, as you can see in the figure below where each block is replicated three times and stored on different DataNodes (considering the default replication factor):
Therefore, if you are storing a file of 128 MB in HDFS using the default configuration, you will end up occupying a space of 384 MB (3*128 MB) as the blocks will be replicated three times and each replica will be residing on a different DataNode.

How to Configure Replication Factor and Block Size for HDFS?
Hadoop Distributed File System (HDFS) stores files as data blocks and distributes these blocks across the entire cluster. As HDFS was designed to be fault-tolerant and to run on commodity hardware, blocks are replicated a number of times to ensure high data availability. The replication factor is a property that can be set in the HDFS configuration file that will allow you to adjust the global replication factor for the entire cluster. For each block stored in HDFS, there will be n – 1 duplicated blocks distributed across the cluster. For example, if the replication factor was set to 3 (default value in HDFS) there would be one original block and two replicas.

Open the hdfs-site.xml file. This file is usually found in the conf/ folder of the Hadoop installation directory. Change or add the following property to hdfs-site.xml:

<property> 
<name>dfs.replication<name> 
<value>3<value> 
<description>Block Replication<description> 
<property>

hdfs-site.xml is used to configure HDFS. Changing the dfs.replication property in hdfs-site.xml will change the default replication for all files placed in HDFS.

You can also change the replication factor on a per-file basis using the Hadoop FS shell.

[umeshp6655@gw02 ~]$ hadoop fs –setrep –w 3 /my/file
Alternatively, you can change the replication factor of all the files under a directory.

[umeshp6655@gw02 ~]$ hadoop fs –setrep –w 3 -R /my/dir
Hadoop Distributed File System was designed to hold and manage large amounts of data; therefore typical HDFS block sizes are significantly larger than the block sizes you would see for a traditional filesystem (for example, the filesystem on my laptop uses a block size of 4 KB). The block size setting is used by HDFS to divide files into blocks and then distribute those blocks across the cluster. For example, if a cluster is using a block size of 64 MB, and a 128-MB text file was put in to HDFS, HDFS would split the file into two blocks (128 MB/64 MB) and distribute the two chunks to the data nodes in the cluster.

Open the hdfs-site.xml file. This file is usually found in the conf/ folder of the Hadoop installation directory.Set the following property in hdfs-site.xml:

<property> 
<name>dfs.block.size<name> 
<value>134217728<value> 
<description>Block size<description> 
<property>

hdfs-site.xml is used to configure HDFS. Changing the dfs.block.size property in hdfs-site.xml will change the default block size for all the files placed into HDFS. In this case, we set the dfs.block.size to 128 MB. Changing this setting will not affect the block size of any files currently in HDFS. It will only affect the block size of files placed into HDFS after this setting has taken effect.

Below is the formula to calculate the HDFS Storage size required, when building a new Hadoop cluster.

H = C*R*S/(1-i)* 120%
Where : 
C = Compression ratio. It depends on the type of compression used (Snappy, LZOP, …) and size of the data. When no compression is used, C=1.
R = Replication factor. It is usually 3 in a production cluster.
S = Initial size of data need to be moved to Hadoop. This could be a combination of historical data and incremental data. (In this, we need to consider the growth rate of Initial Data as well, at least for next 3-6 months period, Like we have 500 TB data now, and it is expected that 50 TB will be ingested in next three months, and Output files from MR Jobs may create at least 10 % of the initial data, then we need to consider 600 TB as the initial data size).
i = intermediate data factor. It is usually 1/3 or 1/4. It is Hadoop’s Intermediate working space dedicated to storing intermediate results of Map Tasks are any temporary storage used in Pig or Hive. This is a common guidlines for many production applications. Even Cloudera has recommended 25% for intermediate results.
120 % – or 1.2 times the above total size, this is because, We have to allow room for the file system underlying the HDFS. For HDFS, this is ext3 or ext4 usually which gets very, very unhappy at much above 80% fill. I.e. For example, if you have your cluster total size as 1200 TB, but it is recommended to use only up to 1000 TB.
Example :
C= 1, a Replication factor of 3, , and Intermediate factor of 0.25 = 1/4
H = 1*3*S/(1-1/4) = 3*S/(3/4) = 4*S
With the assumptions above, the Hadoop storage is estimated to be 4 times the size of the initial data size.


RACK AWARENESS :
Anyways, moving ahead, let’s talk more about how HDFS places replica and what is rack awareness? Again, the NameNode also ensures that all the replicas are not stored on the same rack or a single rack. It follows an in-built Rack Awareness Algorithm to reduce latency as well as provide fault tolerance. Considering the replication factor is 3, the Rack Awareness Algorithm says that the first replica of a block will be stored on a local rack and the next two replicas will be stored on a different (remote) rack but, on a different DataNode within that (remote) rack as shown in the figure above. If you have more replicas, the rest of the replicas will be placed on random DataNodes provided not more than two replicas reside on the same rack, if possible.

This is how an actual Hadoop production cluster looks like. Here, you have multiple racks populated with DataNodes:
Advantages of Rack Awareness:
So, now you will be thinking why do we need a Rack Awareness algorithm? The reasons are:
  • To improve the network performance: The communication between nodes residing on different racks is directed via switch. In general, you will find greater network bandwidth between machines in the same rack than the machines residing in different rack. So, the Rack Awareness helps you to have reduce write traffic in between different racks and thus providing a better write performance. Also, you will be gaining increased read performance because you are using the bandwidth of multiple racks.
  • To prevent loss of data: We don’t have to worry about the data even if an entire rack fails because of the switch failure or power failure. And if you think about it, it will make sense, as it is said that never put all your eggs in the same basket.


What is batch processing in hadoop?
Batch processing is the execution of a series of jobs in a program on a computer without manual intervention (non-interactive). Strictly speaking, it is a processing mode: the execution of a series of programs each on a set or "batch" of inputs, rather than a single input (which would instead be a custom job). However, this distinction has largely been lost, and the series of steps in a batch process are often called a "job" or "batch job".
Hadoop is based on batch processing of big data. This means that the data is stored over a period of time and is then processed using Hadoop. Whereas in Spark, processing can take place in real-time.
Block(Physical) and Split(logical)

Hadoop Mapreduce : 
MapReduce is a processing technique and a program model for distributed computing based on java
  • Input String
  • Mapper
  • Shuffling & Sorting
  • Reducer


HIVE FILE-FORMAT

Apache Hive Apache Hive is an open source data warehouse software that facilitates querying and managing of large datasets residing in ...