In this article, we will Analyze social bookmarking sites to find insights using Big Data Technology, Data comprises of the information gathered from sites that are bookmarking sites and allow you to bookmark, review, rate, on a specific topic. A bookmarking site allows you to bookmark, review, rate, search various links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it.

Problem Statement: Analyse the data in Hadoop Eco-system to:
- Fetch the data into Hadoop Distributed File System and analyze it with the help of MapReduce, Pig, and Hive to find the top-rated links based on the user comments, likes, etc.
- Using MapReduce convert the semi-structured format (XML data) into structured
- Push the (MapReduce) output HDFS and then feed it into PIG, which splits the data into two parts: Category data and Rating data.
- Write a fancy Hive Query to analyze the data further and push the output is into a relational database (RDBMS) using Sqoop.

Attribute Information or Dataset Details:
Data: Input Format – .JSON

Technology Used
- Apache Hadoop (HDFS)
- Mapreduce
- Apache Pig
- Apache Hive
- MySQL
- Shell Script
- Apache Sqoop
- Linux
- Java
MapReduce Code to convert XML File to Flat File or Comma Separated File.
MyMapper.java
Java
x
65
1
import java.io.ByteArrayInputStream;
2
import java.io.IOException;
3
import java.io.InputStream;
4
import javax.xml.parsers.DocumentBuilder;
5
import javax.xml.parsers.DocumentBuilderFactory;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.NullWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Mapper;
12
import org.w3c.dom.Document;
13
import org.w3c.dom.Element;
14
import org.w3c.dom.Node;
15
import org.w3c.dom.NodeList;
16
17
public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
18
19
private static final Log LOG = LogFactory.getLog(MyMapper.class);
20
21
// Fprivate Text videoName = new Text();
22
23
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
24
25
try {
26
27
InputStream is = new ByteArrayInputStream(value.toString().getBytes());
28
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
29
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
30
Document doc = dBuilder.parse(is);
31
32
doc.getDocumentElement().normalize();
33
34
NodeList nList = doc.getElementsByTagName("book");
35
36
for (int temp = 0; temp < nList.getLength(); temp++) {
37
38
Node nNode = nList.item(temp);
39
40
if (nNode.getNodeType() == Node.ELEMENT_NODE) {
41
42
Element eElement = (Element) nNode;
43
44
String id = eElement.getElementsByTagName("id").item(0).getTextContent();
45
String author = eElement.getElementsByTagName("author").item(0).getTextContent();
46
String title = eElement.getElementsByTagName("title").item(0).getTextContent();
47
String genre = eElement.getElementsByTagName("genre").item(0).getTextContent();
48
String price = eElement.getElementsByTagName("price").item(0).getTextContent();
49
String publish_date = eElement.getElementsByTagName("publish_date").item(0).getTextContent();
50
String descriptions = eElement.getElementsByTagName("descriptions").item(0).getTextContent();
51
String review = eElement.getElementsByTagName("review").item(0).getTextContent();
52
String rate = eElement.getElementsByTagName("rate").item(0).getTextContent();
53
String comments = eElement.getElementsByTagName("comments").item(0).getTextContent();
54
55
context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments), NullWritable.get());
56
57
}
58
}
59
} catch (Exception e) {
60
throw new IOException(e);
61
}
62
63
}
64
65
}
XMLDriver.java
Java
1
61
1
import java.io.IOException;
2
import javax.xml.stream.XMLInputFactory;
3
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
4
import org.apache.hadoop.conf.Configuration;
5
import org.apache.hadoop.fs.FileSystem;
6
import org.apache.hadoop.fs.Path;
7
import org.apache.hadoop.conf.*;
8
import org.apache.hadoop.io.*;
9
import org.apache.hadoop.mapred.TextOutputFormat;
10
import org.apache.hadoop.mapreduce.*;
11
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13
import org.apache.hadoop.util.*;
14
import org.apache.hadoop.util.GenericOptionsParser;
15
16
public class XMLDriver {
17
18
/** Bhavesh - for processing XML file using Hadoop MapReduce
19
* @param args
20
* @throws IOException
21
*/
22
public static void main(String[] args) throws IOException {
23
try {
24
25
Configuration conf = new Configuration();
26
27
String[] arg = new GenericOptionsParser(conf,args).getRemainingArgs();
28
29
conf.set("START_TAG_KEY", "<book>");
30
conf.set("END_TAG_KEY", "</book>");
31
32
Job job = new Job(conf, "XML Processing");
33
job.setJarByClass(XMLDriver.class);
34
job.setMapperClass(MyMapper.class);
35
36
job.setNumReduceTasks(0);
37
38
job.setInputFormatClass(XMLInputFormat.class);
39
40
41
job.setMapOutputKeyClass(Text.class);
42
job.setMapOutputValueClass(LongWritable.class);
43
44
job.setOutputKeyClass(Text.class);
45
job.setOutputValueClass(LongWritable.class);
46
47
FileInputFormat.addInputPath(job, new Path(args[0]));
48
FileOutputFormat.setOutputPath(job, new Path(args[1]));
49
50
job.waitForCompletion(true);
51
52
} catch (Exception e)
53
{
54
throw new IOException(e);
55
56
}
57
58
59
}
60
61
}
XMLInputFormat.java
Java
1
129
1
import java.io.IOException;
2
import java.util.List;
3
import org.apache.hadoop.fs.BlockLocation;
4
import org.apache.hadoop.fs.FSDataInputStream;
5
import org.apache.hadoop.fs.FileStatus;
6
import org.apache.hadoop.fs.FileSystem;
7
import org.apache.hadoop.fs.Path;
8
import org.apache.hadoop.io.DataOutputBuffer;
9
import org.apache.hadoop.io.LongWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.InputSplit;
12
import org.apache.hadoop.mapreduce.JobContext;
13
import org.apache.hadoop.mapreduce.RecordReader;
14
import org.apache.hadoop.mapreduce.TaskAttemptContext;
15
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
16
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17
18
public class XMLInputFormat extends TextInputFormat {
19
public static final String START_TAG_KEY = "<book>";
20
public static final String END_TAG_KEY = "</book>";
21
22
/*Bhavesh - Creating XMLInputformat Class for reading XML File*/
23
24
public RecordReader<LongWritable, Text> createRecordReader(
25
InputSplit split, TaskAttemptContext context) {
26
return new XmlRecordReader();
27
}
28
29
public static class XmlRecordReader extends
30
RecordReader<LongWritable, Text> {
31
private byte[] startTag;
32
private byte[] endTag;
33
private long start;
34
private long end;
35
private FSDataInputStream fsin;
36
private DataOutputBuffer buffer = new DataOutputBuffer();
37
private LongWritable key = new LongWritable();
38
private Text value = new Text();
39
40
41
public void initialize(InputSplit is, TaskAttemptContext tac)
42
throws IOException, InterruptedException {
43
FileSplit fileSplit = (FileSplit) is;
44
String START_TAG_KEY = "<book>";
45
String END_TAG_KEY = "</book>";
46
startTag = START_TAG_KEY.getBytes("utf-8");
47
endTag = END_TAG_KEY.getBytes("utf-8");
48
49
start = fileSplit.getStart();
50
end = start + fileSplit.getLength();
51
Path file = fileSplit.getPath();
52
53
FileSystem fs = file.getFileSystem(tac.getConfiguration());
54
fsin = fs.open(fileSplit.getPath());
55
fsin.seek(start);
56
57
}
58
59
60
public boolean nextKeyValue() throws IOException, InterruptedException {
61
if (fsin.getPos() < end) {
62
if (readUntilMatch(startTag, false)) {
63
try {
64
buffer.write(startTag);
65
if (readUntilMatch(endTag, true)) {
66
67
value.set(buffer.getData(), 0, buffer.getLength());
68
key.set(fsin.getPos());
69
return true;
70
}
71
} finally {
72
buffer.reset();
73
}
74
}
75
}
76
return false;
77
}
78
79
80
public LongWritable getCurrentKey() throws IOException,
81
InterruptedException {
82
return key;
83
}
84
85
86
public Text getCurrentValue() throws IOException, InterruptedException {
87
return value;
88
89
}
90
91
92
public float getProgress() throws IOException, InterruptedException {
93
return (fsin.getPos() - start) / (float) (end - start);
94
}
95
96
97
public void close() throws IOException {
98
fsin.close();
99
}
100
101
private boolean readUntilMatch(byte[] match, boolean withinBlock)
102
throws IOException {
103
int i = 0;
104
while (true) {
105
int b = fsin.read();
106
107
if (b == -1)
108
return false;
109
110
if (withinBlock)
111
buffer.write(b);
112
113
if (b == match[i]) {
114
i++;
115
if (i >= match.length)
116
return true;
117
} else
118
i = 0;
119
120
if (!withinBlock && i == 0 && fsin.getPos() >= end)
121
return false;
122
}
123
}
124
125
}
126
127
}
128
129
Apache Pig Script -bookmarkanalysis.pig
Pig
1
13
1
Input_File = LOAD '/BookMarkOutput/' using PigStorage(',') as
2
(book_id:chararray,author:chararray,Title:chararray,genre:chararray,price:float,publish_date:chararray,
3
description:chararray,review:chararray,rate:float,comments:chararray);
4
orderedfile1 = order Input_File by genre;
5
split orderedfile1 into computer_file if genre == 'Computer',
6
database_file if genre == 'Database';
7
store computer_file into '/BookMarkOutput/Type_Computer/';
8
Store database_file into '/BookMarkOutput/Type_Database/';
9
orderedfile2 = order Input_File by rate desc;
10
split orderedfile2 into rate5plus if rate >= 5 and rate <= 10,
11
rate5minus if rate >= 1 and rate < 5;
12
store rate5plus into '/BookMarkOutput/Rating5+/';
13
store rate5minus into '/BookMarkOutput/Rating5-/';
Shell Script
Shell
1
164
1
#####################################################################
2
############################# COMPLETE SCRIPT ###################
3
### HEADER - PROGRAM NAME - <bookmarkanalysis.sh>
4
### AUTHOR - BHAVESH BHADRICHA
5
### DATE - 12/NOV/2015
6
### VERSION - 1.0
7
### DESCRIPTION - Data: It comprises of the information gathered from sites
8
### which are bookmarking sites and allow you to bookmark, review, rate, on
9
### specific topic. A bookmarking site allows you to bookmark, review, rate,
10
### search various links on any topic. The data is in XML format and
11
### contains various links/posts URL, categories defining it and the ratings
12
### linked with it.
13
### Problem Statement: Analyse the data in Hadoop Eco-system to:
14
### 1. Fetch the data into Hadoop Distributed File System and analyze it
15
### with the help of MapReduce, Pig and Hive to find the top rated books
16
### based on the user comments, likes etc.
17
### 2. Using MapReduce convert the semi-structured format (XML data) into
18
### structured format and categorize the user rating as positive and
19
### negative for each of the thousand links.
20
### 3. Push the output HDFS and then feed it into PIG, which splits the
21
### data into two parts: Category data and Ratings data.
22
### 4. Write a fancy Hive Query to analyze the data further and push the
23
### output is into relational database (RDBMS) using Sqoop.
24
###############################################################################
25
###############################################################################
26
##################################
27
###DEFINING THE LOCAL VARIABLES###
28
##################################
29
DATE=$(date +"%Y%m%d_%H%M%S")
30
LOGFILE="/home/bhavesh/Bookmark_POC/LOG/"$DATE".log"
31
##################################################################################
32
############## Converting XML to Flatfile using Mapreduce ########################
33
##################################################################################
34
echo "Mapreduce Program starts here"
35
36
echo "Converting XML to Flatfile using Mapreduce" >> $LOGFILE
37
38
hadoop fs -rmr /BookMarkOutput
39
40
hadoop jar /home/bhavesh/Bookmark_POC/Mapreduce/XMLProcessing.jar XMLDriver /BookMarkInput/* /BookMarkOutput
41
42
if [ $? -eq 0 ]; then
43
echo "Succesfully finished Mapreduce Processing " >> $LOGFILE
44
else
45
echo "XMLProcessing MapReduce Failed Please check the Log " >> $LOGFILE
46
fi
47
48
########################## PIG Processing ########################################
49
#### PIG, which splits the data into two parts: Category data and Ratings data ###
50
##################################################################################
51
52
echo "Pig Script starts here"
53
54
echo "PIG Script,which splits the data into two parts: Category data and Ratings data" >> $LOGFILE
55
56
hadoop fs -rmr /BookMarkOutput/Type_Computer
57
hadoop fs -rmr /BookMarkOutput/Type_Database
58
hadoop fs -rmr /BookMarkOutput/Rating5+
59
hadoop fs -rmr /BookMarkOutput/Rating5-
60
61
pig /home/bhavesh/Bookmark_POC/PIG/bookmarkanalysis.pig
62
63
if [ $? -eq 0 ]; then
64
echo "Succesfully finished PIG Processing " >> $LOGFILE
65
else
66
echo "PIG Processing Failed Please check the Log " >> $LOGFILE
67
fi
68
69
70
############################ HIVE Processing #######################################
71
###### HIVE will load the Category data and Rating Data into Hive Tables ##########
72
####################################################################################
73
74
echo "HIVE Script starts here"
75
76
echo "HIVE which LOAD the data into two parts: Category data Tables and Ratings data Table " >> $LOGFILE
77
78
hive -e 'drop table if exists ComputerBooks';
79
hive -e 'drop table if exists DatabaseBooks';
80
hive -e 'drop table if exists Highest_Rating';
81
hive -e 'drop table if exists Lowest_Rating';
82
83
hive -e "create external table ComputerBooks
84
(Bookid string,
85
author string,
86
title string,
87
genre string,
88
price float,
89
publish_date string,
90
descriptions string,
91
review string,
92
rate float,
93
comments int)
94
row format delimited
95
fields terminated by','
96
lines terminated by '\n'
97
stored as textfile location '/BookMarkOutput/hive/Computerbooks'";
98
99
hive -e "create external table DatabaseBooks
100
(Bookid string,
101
author string,
102
title string,
103
genre string,
104
price float,
105
publish_date string,
106
descriptions string,
107
review string,
108
rate float,
109
comments int)
110
row format delimited
111
fields terminated by','
112
lines terminated by '\n'
113
stored as textfile location '/BookMarkOutput/hive/Databasebooks'";
114
115
hive -e "create external table Highest_Rating
116
(Bookid string,
117
author string,
118
title string,
119
genre string,
120
price float,
121
publish_date string,
122
descriptions string,
123
review string,
124
rate float,
125
comments int)
126
row format delimited
127
fields terminated by','
128
lines terminated by '\n'
129
stored as textfile location '/BookMarkOutput/hive/HighestRating'";
130
131
hive -e "create external table Lowest_Rating
132
(Bookid string,
133
author string,
134
title string,
135
genre string,
136
price float,
137
publish_date string,
138
descriptions string,
139
review string,
140
rate float,
141
comments int)
142
row format delimited
143
fields terminated by','
144
lines terminated by '\n'
145
stored as textfile location '/BookMarkOutput/hive/LowestRating'";
146
147
hive -e "load data inpath '/BookMarkOutput/Type_Computer/part-r-00000' overwrite into table ComputerBooks";
148
hive -e "load data inpath '/BookMarkOutput/Type_Database/part-r-00000' overwrite into table DatabaseBooks";
149
hive -e "load data inpath '/BookMarkOutput/Rating5+/part-r-00000' overwrite into table Highest_Rating";
150
hive -e "load data inpath '/BookMarkOutput/Rating5-/part-r-00000' overwrite into table Lowest_Rating";
151
152
############################ SQOOP Processing #######################################
153
###### Pushing the HIVE Tale data into RDBMS Tables via SQOOP #######################
154
#####################################################################################
155
156
sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table ComputerBooks --export-dir /BookMarkOutput/hive/Computerbooks/part-r-00000 --input-fields-terminated-by '\t';
157
158
sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table Databasebooks --export-dir /BookMarkOutput/hive/Databasebooks/part-r-00000 --input-fields-terminated-by '\t';
159
160
sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table HighestRating --export-dir /BookMarkOutput/hive/HighestRating/part-r-00000 --input-fields-terminated-by '\t';
161
162
sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table LowestRating --export-dir /BookMarkOutput/hive/LowestRating/part-r-00000 --input-fields-terminated-by '\t';
163
164
####################################################################################