Analyze social bookmarking sites to find insights Part 1

In this article, we will Analyze social bookmarking sites to find insights using Big Data Technology, Data comprises of the information gathered from sites that are bookmarking sites and allow you to bookmark, review, rate, on a specific topic. A bookmarking site allows you to bookmark, review, rate, search various links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it.

Problem Statement: Analyse the data in Hadoop Eco-system to:

  1. Fetch the data into Hadoop Distributed File System and analyze it with the help of MapReduce, Pig, and Hive to find the top-rated links based on the user comments, likes, etc.
  2. Using MapReduce convert the semi-structured format (XML data) into structured
  3. Push the (MapReduce) output HDFS and then feed it into PIG, which splits the data into two parts: Category data and Rating data.
  4. Write a fancy Hive Query to analyze the data further and push the output is into a relational database (RDBMS) using Sqoop.

Attribute Information or Dataset Details:

Column
Id
author
title
genre
price
publish_date
descriptions
review
rate
comments

Data: Input Format – .JSON

Technology Used​

  1. Apache Hadoop (HDFS)
  2. Mapreduce 
  3. Apache Pig
  4. Apache Hive
  5. MySQL
  6. Shell Script
  7. Apache Sqoop
  8. Linux
  9. Java

MapReduce Code to convert XML File to Flat File or Comma Separated File.​

MyMapper.java​

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private static final Log LOG = LogFactory.getLog(MyMapper.class);

    // Fprivate Text videoName = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        try {

            InputStream is = new ByteArrayInputStream(value.toString().getBytes());
            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
            Document doc = dBuilder.parse(is);

            doc.getDocumentElement().normalize();

            NodeList nList = doc.getElementsByTagName("book");

            for (int temp = 0; temp < nList.getLength(); temp++) {

                Node nNode = nList.item(temp);

                if (nNode.getNodeType() == Node.ELEMENT_NODE) {

                    Element eElement = (Element) nNode;

                    String id = eElement.getElementsByTagName("id").item(0).getTextContent();
                    String author = eElement.getElementsByTagName("author").item(0).getTextContent();
                    String title = eElement.getElementsByTagName("title").item(0).getTextContent();
                    String genre = eElement.getElementsByTagName("genre").item(0).getTextContent();
                    String price = eElement.getElementsByTagName("price").item(0).getTextContent();
                    String publish_date = eElement.getElementsByTagName("publish_date").item(0).getTextContent();
                    String descriptions = eElement.getElementsByTagName("descriptions").item(0).getTextContent();
                    String review = eElement.getElementsByTagName("review").item(0).getTextContent();
                    String rate = eElement.getElementsByTagName("rate").item(0).getTextContent();
                    String comments = eElement.getElementsByTagName("comments").item(0).getTextContent();
            
                    context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments), NullWritable.get());

                }
            }
        } catch (Exception e) {
              throw new IOException(e);
        }

    }

}

XMLDriver.java​

import java.io.IOException; 
import javax.xml.stream.XMLInputFactory;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class XMLDriver {

    /** Bhavesh - for processing XML file using Hadoop MapReduce
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        try {

            Configuration conf = new Configuration();
           
            String[] arg = new GenericOptionsParser(conf,args).getRemainingArgs();

            conf.set("START_TAG_KEY", "<book>");
            conf.set("END_TAG_KEY", "</book>");

            Job job = new Job(conf, "XML Processing");
            job.setJarByClass(XMLDriver.class);
            job.setMapperClass(MyMapper.class);

            job.setNumReduceTasks(0);

            job.setInputFormatClass(XMLInputFormat.class);
           

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.waitForCompletion(true);

        } catch (Exception e)
        {
              throw new IOException(e);
          
        }
      

    }

}

XMLInputFormat.java​

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class XMLInputFormat extends TextInputFormat {
    public static final String START_TAG_KEY = "<book>";
    public static final String END_TAG_KEY = "</book>";

    /*Bhavesh - Creating XMLInputformat Class for reading XML File*/
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        return new XmlRecordReader();
    }

    public static class XmlRecordReader extends
            RecordReader<LongWritable, Text> {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();
        private LongWritable key = new LongWritable();
        private Text value = new Text();

        @Override
        public void initialize(InputSplit is, TaskAttemptContext tac)
                throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) is;
            String START_TAG_KEY = "<book>";
            String END_TAG_KEY = "</book>";
            startTag = START_TAG_KEY.getBytes("utf-8");
            endTag = END_TAG_KEY.getBytes("utf-8");

            start = fileSplit.getStart();
            end = start + fileSplit.getLength();
            Path file = fileSplit.getPath();

            FileSystem fs = file.getFileSystem(tac.getConfiguration());
            fsin = fs.open(fileSplit.getPath());
            fsin.seek(start);

        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        buffer.write(startTag);
                        if (readUntilMatch(endTag, true)) {

                            value.set(buffer.getData(), 0, buffer.getLength());
                            key.set(fsin.getPos());
                            return true;
                        }
                    } finally {
                        buffer.reset();
                    }
                }
            }
            return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,
                InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;

        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        @Override
        public void close() throws IOException {
            fsin.close();
        }

        private boolean readUntilMatch(byte[] match, boolean withinBlock)
                throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();

                if (b == -1)
                    return false;

                if (withinBlock)
                    buffer.write(b);

                if (b == match[i]) {
                    i++;
                    if (i >= match.length)
                        return true;
                } else
                    i = 0;

                if (!withinBlock && i == 0 && fsin.getPos() >= end)
                    return false;
            }
        }

    }

}

Apache Pig Script -bookmarkanalysis.pig​

Input_File = LOAD '/BookMarkOutput/' using PigStorage(',') as
(book_id:chararray,author:chararray,Title:chararray,genre:chararray,price:float,publish_date:chararray,
description:chararray,review:chararray,rate:float,comments:chararray); 
orderedfile1  = order Input_File by genre;  
split orderedfile1 into computer_file if genre == 'Computer',
                       database_file if genre == 'Database';
store computer_file into '/BookMarkOutput/Type_Computer/';
Store database_file into '/BookMarkOutput/Type_Database/';
orderedfile2 = order Input_File by rate desc;
split orderedfile2 into rate5plus if rate >= 5 and rate <= 10,
                                                rate5minus if rate >= 1 and rate < 5;
store rate5plus into '/BookMarkOutput/Rating5+/';
store rate5minus into '/BookMarkOutput/Rating5-/';

Shell Script

#####################################################################
#############################  COMPLETE SCRIPT   ###################
### HEADER - PROGRAM NAME - <bookmarkanalysis.sh>                              
### AUTHOR - BHAVESH BHADRICHA                                                          
### DATE  - 12/NOV/2015                                                                                     
### VERSION - 1.0                                                                                                 
### DESCRIPTION - Data: It comprises of the information gathered from sites     
### which are bookmarking sites and allow you to bookmark, review, rate, on        
### specific topic. A bookmarking site allows you to bookmark, review, rate,        
### search various links on any topic. The data is in XML format and                    
### contains various links/posts URL, categories defining it and the ratings            
### linked with it.                                                                                                     
### Problem Statement: Analyse the data in Hadoop Eco-system to:                        
### 1. Fetch the data into Hadoop Distributed File System and analyze it                
### with the help of MapReduce, Pig and Hive to find the top rated books             
### based on the user comments, likes etc.                                                               
### 2. Using MapReduce convert the semi-structured format (XML data) into        
### structured format and categorize the user rating as positive and                         
### negative for each of the thousand links.                                                            
### 3. Push the output HDFS and then feed it into PIG, which splits the                
### data into two parts: Category data and Ratings data.                                        
### 4. Write a fancy Hive Query to analyze the data further and push the                
### output is into relational database (RDBMS) using Sqoop.                                 
###############################################################################
###############################################################################
##################################
###DEFINING THE LOCAL VARIABLES###
##################################
DATE=$(date +"%Y%m%d_%H%M%S")
LOGFILE="/home/bhavesh/Bookmark_POC/LOG/"$DATE".log"
##################################################################################
############## Converting XML to Flatfile using Mapreduce ########################
##################################################################################
echo "Mapreduce Program starts here"

echo "Converting XML to Flatfile using Mapreduce" >> $LOGFILE

hadoop fs -rmr /BookMarkOutput

hadoop jar /home/bhavesh/Bookmark_POC/Mapreduce/XMLProcessing.jar XMLDriver /BookMarkInput/* /BookMarkOutput

if [ $? -eq 0 ]; then
    echo "Succesfully finished Mapreduce Processing " >> $LOGFILE
else
    echo "XMLProcessing MapReduce Failed Please check the Log " >> $LOGFILE
fi

########################## PIG Processing ########################################
#### PIG, which splits the data into two parts: Category data and Ratings data ###
##################################################################################

echo "Pig Script starts here"

echo "PIG Script,which splits the data into two parts: Category data and Ratings data" >> $LOGFILE

hadoop fs -rmr /BookMarkOutput/Type_Computer
hadoop fs -rmr /BookMarkOutput/Type_Database
hadoop fs -rmr /BookMarkOutput/Rating5+
hadoop fs -rmr /BookMarkOutput/Rating5-

pig /home/bhavesh/Bookmark_POC/PIG/bookmarkanalysis.pig

if [ $? -eq 0 ]; then
    echo "Succesfully finished PIG  Processing " >> $LOGFILE
else
    echo "PIG Processing Failed Please check the Log " >> $LOGFILE
fi


############################ HIVE Processing #######################################
###### HIVE will load the Category data and Rating Data into Hive Tables  ##########
####################################################################################

echo "HIVE Script starts here"

echo "HIVE which LOAD the data into two parts: Category data Tables and Ratings data Table " >> $LOGFILE

hive -e 'drop table if exists ComputerBooks';
hive -e 'drop table if exists DatabaseBooks';
hive -e 'drop table if exists Highest_Rating';
hive -e 'drop table if exists Lowest_Rating';

hive -e "create external table ComputerBooks
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/Computerbooks'";

hive -e "create external table DatabaseBooks
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/Databasebooks'";

hive -e "create external table Highest_Rating
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/HighestRating'";

hive -e "create external table Lowest_Rating
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/LowestRating'";

hive -e "load data inpath '/BookMarkOutput/Type_Computer/part-r-00000' overwrite into table ComputerBooks";
hive -e "load data inpath '/BookMarkOutput/Type_Database/part-r-00000' overwrite into table DatabaseBooks";
hive -e "load data inpath '/BookMarkOutput/Rating5+/part-r-00000' overwrite into table Highest_Rating";
hive -e "load data inpath '/BookMarkOutput/Rating5-/part-r-00000' overwrite into table Lowest_Rating";

############################ SQOOP Processing #######################################
###### Pushing the HIVE Tale data into RDBMS Tables via SQOOP #######################
#####################################################################################

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table ComputerBooks --export-dir /BookMarkOutput/hive/Computerbooks/part-r-00000 --input-fields-terminated-by '\t';

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table Databasebooks --export-dir /BookMarkOutput/hive/Databasebooks/part-r-00000 --input-fields-terminated-by '\t';

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table HighestRating --export-dir /BookMarkOutput/hive/HighestRating/part-r-00000 --input-fields-terminated-by '\t';

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table LowestRating --export-dir /BookMarkOutput/hive/LowestRating/part-r-00000 --input-fields-terminated-by '\t';

####################################################################################
By Bhavesh