Bidding Auction Data Analytics in Apache Spark

In this article, We have explored the Bidding Auction Data Analysis The dataset to be used is from eBay like online auctions. Even by using small data, I could still gain a lot of valuable insights. I have used Spark RDD in Databricks.
In this activity, we will load data into Apache Spark and inspect the data using the Spark In this section, we use the SparkContext method, textFile, to load the data into a Resilient Distributed Dataset (RDD).

Our dataset is a .csv file that consists of online auction data. Each auction has an auction id associated with it and can have multiple bids. Each row represents a bid. For each bid, we have the following information:

Attribute Information or Dataset Details:

ColumnTypeDescription
AucidStringAuction ID
BidFloatBid amount
BidtimeFloatTime of bid from start of auction
BidderStringThe bidder’s userid
BidrateIntThe bidder’s rating
OpenbidFloatOpening price
PriceFloatFinal price
ItemtypeStringItem type
DtlIntDays to live

We load this data into Spark using RDDs

Objectives
• Load data into Spark
• Use transformations and actions to inspect the data

What transformations and actions would you use in each case?

  1. How do you see the first element of the inputRDD?
  2. What do you use to see the first 5 elements of the RDD?
  3. What is the total number of bids?
  4. What is the total number of distinct items that were auctioned?
  5. What is the total number of item types that were auctioned?
  6. What is the total number of bids per item type?
  7. Across all auctioned items, what is the minimum number of bids?
  8. Across all auctioned items, what is the maximum number of bids?
  9. What is the average number of bids?

Input file (contains 10654 Records)

Spark Code is written in Java

package BiddingDataAnalysis;
import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;

import scala.Function;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

public class Bidding 
{
 public static void main(String[] args) throws Exception
 {
  if (args.length<1)
  {
   System.err.println("Usage: Auction Bid <File> ");
   System.exit(1);
  }
  
  SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  JavaRDD<String> diskfile = sc.textFile(args[0]);
  
  /* How do you see the first element of the Input RDD? */
  
  System.out.println("First Element is = " + diskfile.first());
  
  /* How do you use to see the first 5 element of the RDD? */
  
  System.out.println("First Five Element are = " + diskfile.take(5));

  /* What is the total number of bid? */
  
  System.out.println("Total Number of Bids are = " + diskfile.count());
  
  JavaRDD<String> words = diskfile.flatMap(new FlatMapFunction<String, String>() {
        public Iterator<String> call(String s) {
          return Arrays.asList(s.split(",")[0]).iterator();
        }
      });
   
  System.out.println("Total number of distinct item that were auctioned =" +words.distinct().count());
  
  /* What is the total number of item types that were auctioned? */
  
  JavaRDD<String> items = diskfile.flatMap(new FlatMapFunction<String, String>() {
   public Iterator<String> call(String s){
    return Arrays.asList(s.split(",")[7]).iterator();
   }
  });
  
  System.out.println("Total number of Item types that were auctioned = " + items.distinct().count());
 
  /* What is the Total number of Bids per item type */
  
  JavaPairRDD<String,Integer> pairs = items.mapToPair(new PairFunction<String, String, Integer>() {

   public Tuple2<String, Integer> call(String t) throws Exception {
    // TODO Auto-generated method stub
    return new Tuple2(t,1);
   }   
  });
  
  JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
   
   public Integer call(Integer v1, Integer v2) throws Exception {
    // TODO Auto-generated method stub
    return v1 + v2;
   }
  });
 
  System.out.println("Total number of Bids per item type" + counts.collect());
  
  JavaPairRDD<Integer,Integer> auctionidkey = diskfile.mapToPair(s-> new Tuple2(s.split(",")[0],1));
  
  JavaPairRDD<Integer, Integer> countgroup = auctionidkey.reduceByKey((a,b)->a+b);
  
  JavaRDD<Integer> onlycount = countgroup.values();
  
  
  JavaRDD<Integer> sortedcount = onlycount.sortBy(new org.apache.spark.api.java.function.Function<Integer,Integer>() {    
   @Override
   public Integer call(Integer v1) throws Exception {
    // TODO Auto-generated method stub
    return v1.intValue();
   }
    }, true, 1 );
  
   System.out.println("Across all auctioned items,the minimum number of bids" + sortedcount.first());
   
   Integer max = sortedcount.reduce(new Function2<Integer, Integer, Integer>() {
   
   @Override
   public Integer call(Integer v1, Integer v2) throws Exception {
    // TODO Auto-generated method stub
    if (v1 > v2)
    {
     return v1;
    }
    else 
    {
     return v2;
    }    
    }  
  });
   
   System.out.println("Across all auctioned items,the maximum number of bids = " + max);
   
   Integer summationcount = sortedcount.reduce(new Function2<Integer, Integer, Integer>() {
   
   @Override
   public Integer call(Integer v1, Integer v2) throws Exception {
    // TODO Auto-generated method stub
    return v1 + v2;
   }
  });
   
   int totalcount = (int) sortedcount.count();
   
   float average = (float) summationcount/totalcount;
   
   System.out.println("Across all auctioned items,the avarage number of bids = " + average);
   
   }
  
}

Output

By Bhavesh