Data Intensive Ch10 - Batch processing Flashcards

1
Q

Context map

A

Derived data

Distributed filesystems

MapReduce workflows

Search Indexes
Recommendation engines

Machine Learning

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Summary

A

Chapter looked briefly at Unix tools such as awk, grep, sort as an example of design philosophy which transformed into MapReduce and later dataflow engines.
Key ideas are:
- inputs are immutable
- outputs are intended as input to another (unspecified yet) program - Unix pipelines hello
- complex problems are broken down into smaller pieces that can be solved using tools doing one thing well

Uniform interface
Unix: files and pipes
MapReduces: HDFS

Two main problems of distributed batch processing frameworks:
-> Partitioning
MapReduce mappers are partitioned according to input file blocks
Output of mappers is repartitioned, sorted and merged into a configurable number of REDUCER partitions
Purpose: bring all the related data (records grouped by the same key) together
Later Dataflow engines avoid sorting unless required but works in similar manner

-> Fault tolerance
MapReduce frequently writes to disk. this makes it easier to recover from individual failed tasks. Full restart is not required thanks to that.

Dataflow engines perform less materialization of intermediate state. They keep more in memory. If node fails they need to recompute more. Deterministic operators reduce the amount of data needed to be recomputed.

MapReduce join algorithms (also used in MPP DBs or dataflow engines)
-> Sort-Merge join
Mapper extracts the join key. All records with the same key end up going to the same reducer call which outputs joined records.
-> Broadcast hash join
One of the two join inputs is small - not partitioned. it’s entirely loaded into a hash table. Each mapper loads the hash table and joins with input record.
-> Partitioned hash join
Prerequisite: Both inputs are partitioned in the same way. Hash table approach per partition.

Distributed batch processing engines have an intentional restriction on programming model. All callback functions (mappers, reducers) are assumed to:
- be. stateless
- have no visible side effects besides the output
This allows safe retries in the face of crashes/network issues hidden behind framework abstraction. Output of any failed task can be safely discarded.
Processing code can be oblivious to any fault tolerant mechanisms.

Distinguishing feature of batch processing - reads IMMUTABLE input of FIXED SIZE (data is from some POINT IN TIME/DB SNAPSHOT) and produces some output.
Output is DERIVED from input.
Jobs are said to have BOUNDED input which is the opposite of stream processing where input is UNBOUNDED. Jobs are never-ending streams of data.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Types of systems

A
-> Services - online systems
Waits for Request/ returns response
Latency optimized
-> Batch processing (offline)
Takes large, known amount of data and runs job to process it and output some result
Periodically scheduled
Throughput matters
Runs for minutes to days
-> Stream processing (near real time)
Somewhere in between service and batch
Consumes unbounded input
Produces some output
Job picks up events promptly after they happened
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Unix philosophy

A
  1. Make each program do one thing well. New job -> build afresh, do not complicate old programs with new features
  2. Expect output of every program to become input to another unknown program. Don’t clutter output with extraneous info. void columnar/binary input formats. Don’t insist on interactive input
  3. Design and build software to be tried early. Remove clumsy parts, rebuild if needed.
  4. Use tools to lighten a programming task

Sounds like Agile & DevOps movements

Sort, uniq -> Unix shell like bash -> COMPOSE small programs into data processing jobs

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Unix Uniform interface

A
File (descriptor)
Ordered sequence of bytes
Can be:
- actual file in FS
- communication channel to another process (socket, stdin, stdout)
- device driver
- socket for TCP connection

Many assume seq of bytes is ACII text; each record separated by newline

Stdin/stdout (by default - console input and terminal screen)
Pipes attach stdout of left to stdin of the right process
Stdin/Stdout can be redirected, program doesn’t worry about concrete files
Loose coupling/Inversion of Control

Immutable input files

Pipeline can be inspected at any point (insert less cmd)

Output of some pipeline stage can be written to a file which can become input for the next stage

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Map reduce and Unix similarities

A

MR - distributed across potentially thousands of machines
MR job is like Unix process - take input, produce output
MR - job usually does not modify input; no side effects

Stdin/Stdout for MR are always files in distro FS (HDFS)

MR allows to paralelize a computation across many machines. Programmer needs not explicitly write any code for parallelization

Mapper/reducer operates on one record at a time. They don’t explicitly request any file reads from HDFS. Framework provides proper streams.
Input is typically a directory in HDFS
Each file/file block is considered a separate partition that can be processed by separate map task

Unix pipes become MR workflows

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

HDFS is based on the … principle

A

Shared nothing
No special hardware requirement - commodity hardware is used
Computers connected by a conventional datacenter network

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

HDFS basics

A

Set of daemon processes running on each machine
Each exposes network service to access files stored on that machine
Central server - NameNode - keeps track which file blocks are stored on which node
File blocks are replicated to multiple machines

Usage of commodity hardware and OS software - fairly low cost

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Map Reduce Job Execution

A
  1. Read input files and break them into records (like by \n character)
  2. Call mapper function for each record. This extracts key and value.
  3. Sort all key-value pairs by key
  4. Call reducer function. This iterates over sorted key-value pairs. If there were multiple tuples of given key and value - reducer gets a key with a list of all associated values.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

MapReduce scheduler principle

A
Put the computation near the data
It tries to run mapper on a machine that also stores a replica of the input file (as long as it has enough RAM and CPU)
Saves copying files over network
Reduces network load
Increases locality
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

How is MapReduce number of map/reduce tasks determined?

A

Map - number of input file blocks

Reduce - configured by a job author

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

How does MapReduce sort work?

A

Dataset is assumed to be too large to sort in-single-machine-memory
Staged sorting:
1. Each map task PARTITIONS its output by reducer
- by hash of key
2. each partition is written to mapper’s local disk. Files are sorted (SSTables, LSM Tree reference).
- so each mapper makes a local sorted file with data to be sent per each reducer
3. When mapper is done with input file and written all outputs MR scheduler notifies reducers they can start fetching the output from that mapper
4. Reducer merges individually sorted files from all mappers. Sorting order is preserved. Same keys will be adjacent in resulting file
5. Reducer can be called with a key and ITERATOR which scans over records with the same key (might not fit in memory hence iterator)

Step 2 and 3 is called shuffle

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

MapReduce workflows

A

Chained MR jobs (as single job has limited usefulness)
Output of one job is input to another - no particular support -> usually by convention -> output directory name of job 1 is configured as input directory of job 2

From MR perspective 2 jobs are independent so job 2 cannot start until job 1 output has materialized (so whole job completed successfully)
Third party tools to manage dependencies between jobs like Airflow or Luigi

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Equi-joins

A

Basically join using “=” lol
Type of join where record is associated with another based on having identical value in some particular field like ID.
Other kinds of joins also exist (like using less-than operator)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

We need joins to associate records but… why bother splitting records? Why not keep them all together? Why not denormalize data?

A

having separate tables - data normalization
Often we need same data in different context
If the same data is kept in copies in multiple records -> updating is difficult

Keeping all data together? Inefficient to query, what if some rows associate with the same data?
user + organisation - they change for different reasons
many users would return the same organisation
You could keep all users with organisation though
Hard to query efficiently then though
What if we group users in groups? Do we keep groups with organisations?
What if user can be in a group from an external organisation?

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

What is a join in the context of batch processing?

A

Resolving ALL occurences of some ASSOCIATION within a dataset.
For request/response we usually look up data for one particular user at the same time. Index can be used to speed up the look up.
When doing batch processing - we look up data for ALL users SIMULTANEOUSLY

Example:
Associate users from database with respective activity from clickstream events data.
Kind of Star Schema - events are facts, user is dimension
Embedding in events all profile info like age (useful for stats) is too wasteful
- lots of data - memory impact is large

17
Q

What’s wrong with querying db for records to be joined in batch job?

A

Will likely suffer from poor performance

  1. Low processing throughput due to round-trip to DB
  2. Caching could help BUT distribution of data matters - we might happen to process records for different users within the span of cache size/life
  3. Running queries in parallel could overwhelm DB
  4. Could be nondeterministic - same data could change in between request. Consistent snapshot wouldn’t be read.

Solution:
As much as possible computation should be local to processing machine.
Take a copy-snapshot of DB to the data warehouse - ETL process

Use sort-merge MR join

18
Q

Sort-merge joins

A

AKA Reduce side join
Fig 10-3 p405
Assume DB with records for join (like user details) and facts (user events) could be separate sets of files in HDFS
Mapper for user events
Mapper for user details in DB (like date of birth)
The map tuples (user id, event) and (user id, user details) for reducers to do joins

Sort order putting user details first so reducer gets details as the first element of values iterator
Reducer can then compute an input for stats computing other MR job like (viewed url, viewer age)

+ no requests over the network to get joined record
+ one user record in memory at one time

19
Q

Bring related data together in the same place

A

Looking at sort-merge join in the way that mappers send messages to reducers. Key is destination address for where the value should be delivered.
Problems:
Data skew - lots of records falls into the single key like in user network: usually few hundred connections per person but for celebrities - millions

20
Q

Group by

A

Bring related data together to the same place idea
Good for MR workflow
Reducer gets all records with given key to do aggregation like
- count num of records per group (page views fex)
- adding values in one particular field
- picking top k records according to some ranking function

Sessionization - bring all activity for given user session to the same place to analyse sequence of actions

21
Q

Hot keys/linchpin objects

A

Disproportionately active db records (celebrities in social networks fex)

22
Q

Hot spots/skews

How to handle them?

A

Collecting all activity related to hot key in a single reducer.
MR subsequent jobs are held back as they must wait for the slowest reducer to finish processing the hot spot.

Skewed join - determines which keys are hot first in a sampling job
Then hot keys are spread to random reducers
Each reducer performs partial aggregate for each hotkey
Second MR is needed to join partial aggregations
Caveat: if join is required then other input related to the join for hot key must be distributed across ALL reducers partial-handling given hot key.

23
Q

Reduce-side vs map-side joins

A

Reduce-side - no need to make any assumptions about input data. Whatever is structure or properties mappers can prepare data ready to be joined on reducer
Downside - shuffle phase is a lot of expensive disk IO

Map-side join - based on cut-down MR job (no reducers -> no shuffle -> no sorting)
Requires MAKING ASSUMPTIONS about input data (like one dataset is small and fits memory completely)
Each mapper reads one input block and outputs one flle in DFS

24
Q

Map-side joins

A

Broadcast hash joins
Large data dataset is joined with a small dataset (fitting the memory of EACH mapper)
Broadcast because small dataset is “broadcast” to all partitions of the large input - to all mappers)
Hash because it uses hash table to keep the data in-mem
Alternative - small input is stored in RO index on the local disk. Frequently accessed parts of the index should remain in OS page cache - should be almost as fast as hash table and does not require fit-in-memory

Partitioned hash joins
Same as broadcast except loads partition of small dataset relevant to input-partition
Requires both datasets to be partitioned in the same manner (same number of partitions, partitions done on the same key and the same hash function)

Map-side merge joins
Appliable if input dataset are partitioned in the same way and SORTED based on the same key (so basically input is already partitioned and sorted on the same key)
It’s like sort-merge-join except done on mapper…

25
Q

What outputs batch processing is good for?

A

-> search indexes
In simplified way - mapper produces term-doc id tuples and reducers build term dictionaries term-[all doc ids]
Simplified as it doesn’t take relevance, mispellings, synonyms etc into account
Also - incremental search index rebuild is usually needed

-> ML results
Output of such ML tasks is usually database itself (key value store like for example)
Making network request record-by-record in reducer slows down the throughput or could overwhelm DB (many MR tasks in parallel).
Also usually MR assumes it can re-run any failed jobs and failed output can be safely discarded. Making network calls to external DB causes visible side-effects so jobs cannot be retried cleanly.
Solution -> assume job should output a brand new database data files which can be then bulk loaded onto the server

26
Q

What is the philosophy of batch process outputs?

A

Avoid side effects such as writing to external DB or sending an email
Treat inputs as immutable
Clean-all-or-nothing output
Separation of logic from wiring (input & output) like in Unix pipes

+ better performance (throughput, no nonsense request-awaiting)
+ safe to rerun
+ maintainability
+ if bug slips into the code and wrong output is produced, previous version can be rolled back, job can be re-run and output is fine this time -> HUMAN FAULT TOLERANCE
– OLTP DBs do not have this as if corrupted data gets into DB, simple code rollback does not correct the data
+ MINIMIZED IRREVERSIBILITY
+ the same input files can be used for various different jobs

27
Q

Hadoop vs Distributred Databases

A

DDBs (using MPP massively parallel processing) were dedicated running analytic SQL queries on a cluster of machines
MR + HDFS provides something like general-purpose OS for arbitrary programs
Hadoop ecosystem also comes with different DBs on top of HDFS (OLTP HBase, MPP OLAP HBase)

Diverse storage:
DBs required strict data model (structure)
MR allows dump & figure format out later approach (files are just sequence of bytes in the end)
High-quality data might seem important for end user BUT making data available quickly proven to be MORE VALUEABLE than trying to come up with IDEAL model UP FRONT
MR is good for ETL - trx data from DB is dumped in raw format, then MR jobs are run to clean up data and transform it into some relational form to be imported into MPP warehouse

Diverse processing models:
Not all the processing can be expressed as SQL. Analytical queries - sure. DBs are good for that.
ML systems usually require some app-specific coding so MR wins here.
Also there is Hive project for running SQL on top of HDFS + MR.

Designed for failing:
If MPP crashes whole query must be restarted.
If one mapper fails restarting whole job is usually unacceptable. MR can handle mapper/reducer task failures - those can be restarted independently.
Good for large data processing.
MR task preemption capability -> We have a freedom for arbitrary job termination (useful for better utilization of resources - if higher priority job appears in the queue we can just kill currently running MR tasks - we can rerun them later)

28
Q

MapReduce downsides

A

Clunky API - many abstractions/tools on top to ease the chores (Hive, Pig, Cascading, Crunch)

Materialization of Intermediate State - lots of files are written back and forth for complex MR workflows
(Unix pipes don’t produce that many intermediate state - they STREAM incrementally output of one command as input to another)
It is an issue for DFS - lots of ephemeral files which are replicated across several nodes anyway -> overkill

Loose coupling of input/output is not always useful if only one well-known job is consuming the data

MR job cannot be started until ALL tasks in the preceding jobs have completed. Struggler tasks (hot spots handling) will stall the whole job

Mapper can be redundant - sometimes previous job’s reducer can already produce output ready for the next reducer. Instead - identity mapper must be used…

29
Q

Dataflow engines

A

Try to fix some downsides of MR
Spark, Flink, Tez

Dataflow because they model the flow of data through several processing stages

Repeatedly call user-defined function to process ONE record at the time on a SINGLE thread
Parallelization is done by PARTITIONING inputs
Output of one function becomes input to another

UNLIKE MR there isn’t strict alternating map-reduce stages
FLEXIBLE to assemble - functions are operators
Operations like repartitioning, sorting

Expensive work like sorting is done when it’s actually neeeded
No unnecessary map tasks if work can be done by preceding reduce operator (mapper does not repartition the data)
All joins and data dependencies in a workflow are explicitly declared. Scheduler has an overview where what data is needed so can attempt to make LOCALITY optimization (task consumes data produced by the previous operator? let’s schedule it on the same node then and use shared memory buffer!).
Intermediate state mostly in-memory or on local disk
Operators can stream - execute as soon as some input for them is ready
Existing JVM processes can be reused - no need to launch new JVM per task

Fault tolerance:
Less intermediate state but if some task fails the state can be recomputed using data from prior stages (which might still be available on different machine) or it’s recomputed from the scratch
RDD - resilient distributed dataset - abstraction for tracking ancestry of data. Enables recomputation - there is an info which input partitions were used, which operators applied etc
Nondeterministic operators are tricky - rerun can produce slightly different result! Sometimes it might be better to materialize the data (ESPECIALLY if this data is smaller than source data)

30
Q

Dataflow processing vs graph processing

A

Dataflow - job is directed acyclic graph of operators

Graph processing.- input data is in form of a graph

31
Q

Graphs processing if HDFS

A

PageRank like
Iterative - repeat until done
Traverse one edge at a time, join one vertex with adjacent vertex to propagate some info then repeat until some condition is met (no more edges to follow, some metric converges)