Distributed Data Processing with Hadoop

Eyal Oren — eyal@cs.vu.nl

Data-intensive computing

  • Why? Because the Web is large!
    • Google: MapReduce
    • Yahoo: Hadoop
    • Amazon: EC2, S3
    • Microsoft: Dryad & DryadLINQ
    • IBM: Blue Cloud, Kittyhawk on Blue Gene

  • Why should we care?
  • The Semantic Web is getting large too!

A look at other disciplines

"For the long-term goal, exploiting the Web's potential for being the world's largest knowledge base, XML and Semantic Web are key assets, but by themselves not sufficient. We need to cope with diversity, incompleteness, and uncertainty: we have an absolute need for ranked retrieval, and statistics is key: combine techniques from DB, IR, AI, and ML."
Gerhard Weikum, ER'04, SIGMOD'07

Semantic Web and other disciplines

  • DB: predicate indices, B-trees, Datalog, query planning
    • OntoBroker, Sesame, Virtuoso, Yars
    • RDFBroker, C-Store, MonetDB
  • ML: corpus, data-driven, statistics
    • TextRunner, KnowItAll (Etzioni, UW), Fu & Weld (Wikipedia)
  • IR: inverted indices, statistics, ranking
    • Swoogle, Sindice, Semplore
  • Web: large-scale data processing
    Distributed execution of high-level algebra
    over semi-structured data

Computations over massive data sets

  • Sampling: run algorithm over small number of data elements
  • Streaming: stream through data and run algorithm on limited portion
  • Sketching: compress data chunks into "sketches", run algorithm on sketches
    Z. Bar-Yossef

MapReduce and friends

  • GFS (HDFS): distributed file system (SOSP03)
  • MapReduce (Hadoop): distributed job dispatching (OSDI04)
  • BigTable (HBase): virtual distributed database table (OSDI06)
  • Sawzall (Pig): high-level algebra (SP2005)

  • Scalable: reliably store and process petabytes
  • Economical: distribute data and processing over commodity nodes
  • Efficient: process data in parallel on the nodes where it is located
  • Reliable: maintain multiple copies, automatically redeploy failures

MapReduce (1)

  • Map: map(square '(1 2 3 4)) = (1 4 9 16)
  • Reduce: reduce(+ '(1 4 9 16)) = 30

  • Distribute computation over nodes that contain data
  • Network-aware memory hierarchy (cache, memory, disk, rack, network)
  • Worker failure: heartbeat and reschedule

MapReduce (2)

MapReduce example

map(String key, String value):
  for each word w in value:
    emit(w, 1);

reduce(String key, Iterator values):
  int result = 0;
  for each v in values:
    results += v;
  emit(result);
      

GFS/HDFS (1)

GFS/HDFS (2)

  • Master exposes data placement
  • Hardware failure? Replication
  • High throughput? Streaming data access
  • Large data sets? Large files tuning
  • Coherency? Write-once-read-many
  • Smart replica placement and selection

Deployments

  • Google deployment (OSDI'04, CACM'08)
    • 1800 commodity nodes
    • Distributed grep over 1TB: 150s
    • Distributed sort over 1TB: 850s
  • Yahoo deployment (Feb 2008)
    • Input: all crawled pages (200 TB)
    • Output: link graph and metadata (300 TB)
    • >10.000 CPU cores, >1 trillion page links, >5 PB disk usage
    • 30% faster than before (because of Hadoop's rescheduling)
  • Amazon A9, Facebook, Joost, Last.fm, NY Times, Powerset, Veoh

Sawzall

  • High-level algebra: raw data --> filter --> aggregate
  • Compiles into map (filter) and reduce (aggregate) job
  • Executed on each record, emits values into aggregators
  • Aggregators: collection, sum, maximum, top, sample, quantile, unique
x : float = input;
sum_of_squares: table sum of float;
emit sum_of_squares <- x * x;
          
input: [1, 2, 3, 4]
emit: [1, 4, 9, 16]
squares: [30]
          

More Sawzall examples

count: table sum of int;
total: table sum of float;
squares: table sum of float;
x : float = input;

emit count <- 1;
emit sum <- x;
emit squares <- x * x;
        
proto "document.proto"
max_pagerank: table maximum(1) [domain: string] of url: string weight pagerank: int;

doc : Document = input
emit max_pagerank[domain(doc.url)] <- doc.url weight doc.pagerank;
        
['google.com' => 'google.com/index.html',
 'adobe.com' => 'adobe.com/download/acrobat/',
 'bangkok.com' => 'bangkok.com/nightlife/whatsup.htm']
        

Yahoo Pig!

  • Yahoo's high-level algebra (unpublished)
    • Tuple (record): string, bag, map, tuple
    • Operators: load, group, cogroup, foreach
  • Open-source, stand-alone or on top of Hadoop
  • Feels like relational operators, same as Sawzall
VISITS = load '/visits' as (user, url, time);
USER_VISITS = group VISITS by user;
USER_COUNTS = foreach USER_VISITS generate group as user, COUNT(VISITS) as numvisits;
ALL_COUNTS = group USER_COUNTS all;
AVG_COUNT = foreach ALL_COUNTS generate AVG(USER_COUNTS.numvisits);
      
VISITS = load '/visits' as (user, url, time);
PAGES = load '/pages' as (url, pagerank);
VISITS_PAGES = join VISITS by url, PAGES by url;
USER_VISITS = group VISITS_PAGES by user;
USER_AVGPR = foreach USER_VISITS generate group, 
AVG(VISITS_PAGES.pagerank) as avgpr;
GOOD_USERS = filter USER_AVGPR by avgpr > '0.5';
      

Back to the Semantic Web

  • Offline distributed processing:
    • Sindice: process RDF and submit to Solr/Lucene index
    • Pig-based RDFS: load triples, deductive closure, SPARQL query
    • Mahout distributed ML algorithms
  • Online distributed indexing
    • Jars2 with SPARQL (demo at swse.deri.org): read-only distributed sparse-indices
    • RDF in C-Store columns (VLDB 2007 best paper)
    • HBase RDF: store RDF in distributed HBase tables
    • RDF in MonetDB (not yet)