In this article, we will Analyze social bookmarking sites to find insights using Big Data Technology, Data comprises of the information gathered from sites that are bookmarking sites and allow you to bookmark, review, rate, on a specific topic. A bookmarking site allows you to bookmark, review, rate, search various links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it.
Problem Statement: Analyse the data in Hadoop Eco-system to:
- Fetch the data into Hadoop Distributed File System and analyze it with the help of MapReduce, Pig, and Hive to find the top-rated links based on the user comments, likes, etc.
- Using MapReduce convert the semi-structured format (XML data) into structured
- Push the (MapReduce) output HDFS and then feed it into PIG, which splits the data into two parts: Category data and Rating data.
- Write a fancy Hive Query to analyze the data further and push the output is into a relational database (RDBMS) using Sqoop.
Attribute Information or Dataset Details:
Column |
---|
Id |
author |
title |
genre |
price |
publish_date |
descriptions |
review |
rate |
comments |
Data: Input Format – .JSON
Technology Used
- Apache Hadoop (HDFS)
- Mapreduce
- Apache Pig
- Apache Hive
- MySQL
- Shell Script
- Apache Sqoop
- Linux
- Java
MapReduce Code to convert XML File to Flat File or Comma Separated File.
MyMapper.java
import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private static final Log LOG = LogFactory.getLog(MyMapper.class); // Fprivate Text videoName = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { InputStream is = new ByteArrayInputStream(value.toString().getBytes()); DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); Document doc = dBuilder.parse(is); doc.getDocumentElement().normalize(); NodeList nList = doc.getElementsByTagName("book"); for (int temp = 0; temp < nList.getLength(); temp++) { Node nNode = nList.item(temp); if (nNode.getNodeType() == Node.ELEMENT_NODE) { Element eElement = (Element) nNode; String id = eElement.getElementsByTagName("id").item(0).getTextContent(); String author = eElement.getElementsByTagName("author").item(0).getTextContent(); String title = eElement.getElementsByTagName("title").item(0).getTextContent(); String genre = eElement.getElementsByTagName("genre").item(0).getTextContent(); String price = eElement.getElementsByTagName("price").item(0).getTextContent(); String publish_date = eElement.getElementsByTagName("publish_date").item(0).getTextContent(); String descriptions = eElement.getElementsByTagName("descriptions").item(0).getTextContent(); String review = eElement.getElementsByTagName("review").item(0).getTextContent(); String rate = eElement.getElementsByTagName("rate").item(0).getTextContent(); String comments = eElement.getElementsByTagName("comments").item(0).getTextContent(); context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments), NullWritable.get()); } } } catch (Exception e) { throw new IOException(e); } } }
XMLDriver.java
import java.io.IOException; import javax.xml.stream.XMLInputFactory; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.*; import org.apache.hadoop.util.GenericOptionsParser; public class XMLDriver { /** Bhavesh - for processing XML file using Hadoop MapReduce * @param args * @throws IOException */ public static void main(String[] args) throws IOException { try { Configuration conf = new Configuration(); String[] arg = new GenericOptionsParser(conf,args).getRemainingArgs(); conf.set("START_TAG_KEY", "<book>"); conf.set("END_TAG_KEY", "</book>"); Job job = new Job(conf, "XML Processing"); job.setJarByClass(XMLDriver.class); job.setMapperClass(MyMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(XMLInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } catch (Exception e) { throw new IOException(e); } } }
XMLInputFormat.java
import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class XMLInputFormat extends TextInputFormat { public static final String START_TAG_KEY = "<book>"; public static final String END_TAG_KEY = "</book>"; /*Bhavesh - Creating XMLInputformat Class for reading XML File*/ @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { return new XmlRecordReader(); } public static class XmlRecordReader extends RecordReader<LongWritable, Text> { private byte[] startTag; private byte[] endTag; private long start; private long end; private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private LongWritable key = new LongWritable(); private Text value = new Text(); @Override public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) is; String START_TAG_KEY = "<book>"; String END_TAG_KEY = "</book>"; startTag = START_TAG_KEY.getBytes("utf-8"); endTag = END_TAG_KEY.getBytes("utf-8"); start = fileSplit.getStart(); end = start + fileSplit.getLength(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(tac.getConfiguration()); fsin = fs.open(fileSplit.getPath()); fsin.seek(start); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (fsin.getPos() < end) { if (readUntilMatch(startTag, false)) { try { buffer.write(startTag); if (readUntilMatch(endTag, true)) { value.set(buffer.getData(), 0, buffer.getLength()); key.set(fsin.getPos()); return true; } } finally { buffer.reset(); } } } return false; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return (fsin.getPos() - start) / (float) (end - start); } @Override public void close() throws IOException { fsin.close(); } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); if (b == -1) return false; if (withinBlock) buffer.write(b); if (b == match[i]) { i++; if (i >= match.length) return true; } else i = 0; if (!withinBlock && i == 0 && fsin.getPos() >= end) return false; } } } }
Apache Pig Script -bookmarkanalysis.pig
Input_File = LOAD '/BookMarkOutput/' using PigStorage(',') as (book_id:chararray,author:chararray,Title:chararray,genre:chararray,price:float,publish_date:chararray, description:chararray,review:chararray,rate:float,comments:chararray); orderedfile1 = order Input_File by genre; split orderedfile1 into computer_file if genre == 'Computer', database_file if genre == 'Database'; store computer_file into '/BookMarkOutput/Type_Computer/'; Store database_file into '/BookMarkOutput/Type_Database/'; orderedfile2 = order Input_File by rate desc; split orderedfile2 into rate5plus if rate >= 5 and rate <= 10, rate5minus if rate >= 1 and rate < 5; store rate5plus into '/BookMarkOutput/Rating5+/'; store rate5minus into '/BookMarkOutput/Rating5-/';
Shell Script
##################################################################### ############################# COMPLETE SCRIPT ################### ### HEADER - PROGRAM NAME - <bookmarkanalysis.sh> ### AUTHOR - BHAVESH BHADRICHA ### DATE - 12/NOV/2015 ### VERSION - 1.0 ### DESCRIPTION - Data: It comprises of the information gathered from sites ### which are bookmarking sites and allow you to bookmark, review, rate, on ### specific topic. A bookmarking site allows you to bookmark, review, rate, ### search various links on any topic. The data is in XML format and ### contains various links/posts URL, categories defining it and the ratings ### linked with it. ### Problem Statement: Analyse the data in Hadoop Eco-system to: ### 1. Fetch the data into Hadoop Distributed File System and analyze it ### with the help of MapReduce, Pig and Hive to find the top rated books ### based on the user comments, likes etc. ### 2. Using MapReduce convert the semi-structured format (XML data) into ### structured format and categorize the user rating as positive and ### negative for each of the thousand links. ### 3. Push the output HDFS and then feed it into PIG, which splits the ### data into two parts: Category data and Ratings data. ### 4. Write a fancy Hive Query to analyze the data further and push the ### output is into relational database (RDBMS) using Sqoop. ############################################################################### ############################################################################### ################################## ###DEFINING THE LOCAL VARIABLES### ################################## DATE=$(date +"%Y%m%d_%H%M%S") LOGFILE="/home/bhavesh/Bookmark_POC/LOG/"$DATE".log" ################################################################################## ############## Converting XML to Flatfile using Mapreduce ######################## ################################################################################## echo "Mapreduce Program starts here" echo "Converting XML to Flatfile using Mapreduce" >> $LOGFILE hadoop fs -rmr /BookMarkOutput hadoop jar /home/bhavesh/Bookmark_POC/Mapreduce/XMLProcessing.jar XMLDriver /BookMarkInput/* /BookMarkOutput if [ $? -eq 0 ]; then echo "Succesfully finished Mapreduce Processing " >> $LOGFILE else echo "XMLProcessing MapReduce Failed Please check the Log " >> $LOGFILE fi ########################## PIG Processing ######################################## #### PIG, which splits the data into two parts: Category data and Ratings data ### ################################################################################## echo "Pig Script starts here" echo "PIG Script,which splits the data into two parts: Category data and Ratings data" >> $LOGFILE hadoop fs -rmr /BookMarkOutput/Type_Computer hadoop fs -rmr /BookMarkOutput/Type_Database hadoop fs -rmr /BookMarkOutput/Rating5+ hadoop fs -rmr /BookMarkOutput/Rating5- pig /home/bhavesh/Bookmark_POC/PIG/bookmarkanalysis.pig if [ $? -eq 0 ]; then echo "Succesfully finished PIG Processing " >> $LOGFILE else echo "PIG Processing Failed Please check the Log " >> $LOGFILE fi ############################ HIVE Processing ####################################### ###### HIVE will load the Category data and Rating Data into Hive Tables ########## #################################################################################### echo "HIVE Script starts here" echo "HIVE which LOAD the data into two parts: Category data Tables and Ratings data Table " >> $LOGFILE hive -e 'drop table if exists ComputerBooks'; hive -e 'drop table if exists DatabaseBooks'; hive -e 'drop table if exists Highest_Rating'; hive -e 'drop table if exists Lowest_Rating'; hive -e "create external table ComputerBooks (Bookid string, author string, title string, genre string, price float, publish_date string, descriptions string, review string, rate float, comments int) row format delimited fields terminated by',' lines terminated by '\n' stored as textfile location '/BookMarkOutput/hive/Computerbooks'"; hive -e "create external table DatabaseBooks (Bookid string, author string, title string, genre string, price float, publish_date string, descriptions string, review string, rate float, comments int) row format delimited fields terminated by',' lines terminated by '\n' stored as textfile location '/BookMarkOutput/hive/Databasebooks'"; hive -e "create external table Highest_Rating (Bookid string, author string, title string, genre string, price float, publish_date string, descriptions string, review string, rate float, comments int) row format delimited fields terminated by',' lines terminated by '\n' stored as textfile location '/BookMarkOutput/hive/HighestRating'"; hive -e "create external table Lowest_Rating (Bookid string, author string, title string, genre string, price float, publish_date string, descriptions string, review string, rate float, comments int) row format delimited fields terminated by',' lines terminated by '\n' stored as textfile location '/BookMarkOutput/hive/LowestRating'"; hive -e "load data inpath '/BookMarkOutput/Type_Computer/part-r-00000' overwrite into table ComputerBooks"; hive -e "load data inpath '/BookMarkOutput/Type_Database/part-r-00000' overwrite into table DatabaseBooks"; hive -e "load data inpath '/BookMarkOutput/Rating5+/part-r-00000' overwrite into table Highest_Rating"; hive -e "load data inpath '/BookMarkOutput/Rating5-/part-r-00000' overwrite into table Lowest_Rating"; ############################ SQOOP Processing ####################################### ###### Pushing the HIVE Tale data into RDBMS Tables via SQOOP ####################### ##################################################################################### sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table ComputerBooks --export-dir /BookMarkOutput/hive/Computerbooks/part-r-00000 --input-fields-terminated-by '\t'; sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table Databasebooks --export-dir /BookMarkOutput/hive/Databasebooks/part-r-00000 --input-fields-terminated-by '\t'; sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table HighestRating --export-dir /BookMarkOutput/hive/HighestRating/part-r-00000 --input-fields-terminated-by '\t'; sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table LowestRating --export-dir /BookMarkOutput/hive/LowestRating/part-r-00000 --input-fields-terminated-by '\t'; ####################################################################################