Distributed Systems

Note: This course was taken during the fall semester of 2017


Exam 2 Notes

Mutual Exclusion

Mututal Exclusion (Mutex) - Is when we give processes the ability to be granted exclusive access to a certain resource.

Algorithms which provides mutex should be designed to avoid process startvation and that exactly one process gets the resource even if multiple request the resource concurrently.

The algorithms should be fair in that every process which requests a resource will eventually get a chance to have it. If this does not occur we refer to it as starvation

Another name for a mutex is a lock - because it locks out other resources. If there is a time limit on the lock then we call it a lease.

Categories of mutex algorithms:

Lamport’s Mutex Algorithm: Require a process who wants a resource to send a timestampted request for the resource to all processes in the group (including itslef). Immediately acknowledge any of these messages which are received and place the request within a priority queue sorted by lamport timestamp.

Basically the entire process involves broadcasting your request so that all members can construct the same locking queue whereby when you need to request lock access simply check if your request is at the top of the queue. Based on reliable multicasts

Ricart and Agrawala: A process which wants to access a resource must send a message to all other processes and wait for responses. If the process has the resource, delay response until you’ve let go of the resource. Respond to the process with the leaste recent timestamp.

Election Algorithms

The goal of election algorithms is to get exactly one process selected as the “leader” of a group of other processes.

Bully: Selects the largest ID as the leader/coordinator.

Ring Election:

Chang and Roberts


goal: Have a group of processes agree on a common value

Consensus must satisfy four goals:

Two main consensus algorithms:

To fix the issue of too many proposers with requests

Locks vs Leases

Lease is a lock w/ expiration time.

Distributed Transactions

Commit Protocols

Two-Phase Commit Protocol

Phase 1:

Phase 2:

Three-Phase Commit Protocol

Three phase is similar to two phase except it allows timeouts

Phase 1:

Phase 2:

Phase 3:

Three phase allows 2 things:

  1. Enables the use of a recovery coordinator
  2. Every phase can now time out, instead of indefinitely waiting.

CAP Theorem

You can only have two of:

Cousin of ACID sometimes we want BASE

Concurrency Control

Goal: Allow multiple transactions to run concrruently but ensure that any resource access is controlled to give results which are identical if they ran serially. Perserves the Isolated gurantee of ACID.

Distributed Deadlock



Chubby (Apache ZooKeeper)

Parallel File Systems

Distributed Lookup (Hash Tables)

Exam 3 Notes

Google MapReduce (and Apache Hadoop)

Traditional programming is typically serial

The challenge with this is that we need to identify which tasks can be run concurrently and which might be serial. Not all problems may be parallelized

In the simplest case there is no dependency among the data - meaning that data can be split into equal sized chunks and processed.

One of the first widely user frameworks to accomplish general-purpose paralleization was MapReduce

MapReduce is based on two distinct functions - map and reduce

Google’s MapReduce was great because it


Map (in MapReduce) splits data in shards (process known as partitioning) where the shards are processed on a single machine. It then aggregates the values and sorts them in an intermediate phase before passing to reduce.

Reduce then takes the sorted intermediate phase K/V pairs and aggregates them into some kind of result. The K/V pairs are mapped to nodes via a paritioning using a hash function (hash(key) mod R, where R is the number of partitions)

So, in all we go follow:

Input files > shards > map workers > intermediate files > reduce workers > output files

Within the map worker there are two functions

Within the reduce worker there are two functions:

In detail, the steps are as follows:

  1. Split input files in chunks (typically around 64MB)
  2. Fork processes/start up copies of workers on many machines
    • 1 master scheduler and coordinator
    • Idle workers are assigned either a map or reduce task
  3. Map task
    • Read contents of input shards assigned
    • Parses KV pairs
    • Passes pair to a user defined map function
  4. Create intermediate files
    • Intermediate files buffered in memory, periodically written to the local disk
    • Partitioned in R regions - Notifies master when complete. - 4a. Map data processed by the reduce workers
    • All KV pairs are SORTED
    • Parition function decides which of R reducer worker will work on which key
    • each reduce worker reads partition from EVERY map worker
  5. Reduce task: sorting
    • Reduce workers are notified by master about location of intermediate files for its partition
    • Uses RPCs to read data from disks of map workers
    • When reading the data, the reduce worker sorts is by the intermediate key and occurrences of the same key are grouped together.
  6. Reduce Task: reducing
    • The reduce function is given a key and set of values for a key, it then writes the results to a file.
  7. Return data to the user

Fault Tolerance:



Distributed grep

Count URL Access Frequency

Reverse web-link graph

Inverted Index

As great as MapReduce is, all is not perfect. MapReduce used batch processing and took hours to run.

In practice, data is also typically not simple files. It is stored as B-Trees, tables, databases, memory mapped KV pairs

Textual data is also rarely used. Google’s protobuf is one widely used framework to convert textual data/objects into binary formats.

BigTable (and Apache HBase)

BigTable is a Highly available distributed storage system for semi-structured data. i.e

BigTable runs on a large scale (petabytes). It can store billions of URLs with many versions on each page, hundreds of millions of clients, thousands of queries per second. It stores 100+TB of imagery

It is used for many google services.

It is NOT a relational database like typical SQL databases. (Though it does appear as a table like many traditional databases)

A simple example of how a table is structured is shown below

  col 2 col 3
key 1 t1,t2 t1, t2, t3, t4
key 2 t2, t3 t1

Where in each cell, data at different timestampts, t1, t2, t3, etc.. are stored.

Table Splitting

Tables in BigTable start as a single tablet, as it grows, it will split into multiple tablets. I.e. it selects a split point and puts the rows before the point into a tablet, and after the same point into another tablet.

Column Families: group of column keys, the basic unit of data access


API Operations

The implementation of BigTable is supported by

Bigtable is implemented using many tablet servers - these coordinate requests to tablets.

BigTable assigns one tablet server as a master. It

BigTable also has a METADATA table which stores information about bigtable itself

Every tablet is assigned to one tablet server at a time.

Chubby is responsible for keeping track of the tablet servers. It creates and acquires a lock on a uniquely named file in a chubby server directory. The master monitors the directory to discover new tablets.

When the master starts:

Fault Tolerance

BigTable vs Dynamo

Google Spanner

Spanner adds:

Generally, Spanner is

The goal of spanner is to make BigTable easy for programmers to use

Working with eventual consistency and merging is hard don’t make developers deal with it!

Data Storage in Spanner

The universe holds one or more databases

The databases hold one or more tables, where tables are an arbitrary number of rows and columns

Shards and tablets are replicated synchronously with Paxos and transactions across shards use the 2-phase commit protocol

A directory is a set of contiguous keys which is used as a unit of data allocation and provides granularity for data movement between Paxos groups.


All transactions follow ACID and follow strict 2-phase locking.

  1. Acquires all locks
  2. Do work
  3. Get a commit timestamp
  4. Log the commit timestamp via paxos to a majority of replicas
  5. do the commit
    • Apply changes locally and to repicas
  6. release the locks

However, 2-phase locking can be slow, so we use separate read and write locks.

Multiversion concurrency saved us by taking a snapshot of the database for transactions up to a point in time. It then allows us to read old data without getting a lock (good for long-running reads, i.e. searching).

This means we must have commit timestamps that enable meaningful snapshots.

Getting Good Commit Timestamps


Commit Wait

  1. Acquire locks
  2. Do work
  3. Get a commit timestamp t = TT.now().latest()
  4. Commit WAIT: until TT.now().earliest > t
  5. Commit
  6. Release locks

If we want to integrate data replication and concurrency control we can create the replicas of our data while holding the Commit wait

Spanner Conclusion

Other Parallel Frameworks

Apache Pig

Used via grunt, the Pig shell. It allows you to submit scripts directly to Pig. Pig then compiles to MapReduce jobs.

Generally in the Pig framework (written in PigLatin):

  1. submit script
  2. Pig framework:
    • Parses, checks, optimized, plans execution, submit jar to hadoop, monitors progress
  3. Hadoop executes Map and Reduce functions.

Pig loads data via PigStorage, BinStorage, BinaryStorage, Textloader, or PigDump

However, while mapreduce is a great parallel framework, is doesn’t scale well for all problems.

For many other problems it will require multiple iterations and the data locality is typically not preserved between Map and Reduce jobs. This means there is lots of communication between map and reduce workers.

Bulk Synchronous Parallel (BSP)

BSP is computing model for parallel computations.

Concurrent Computation


Barrier Synchronization

BSP Implementation: Apache Hama

Hama Programming

Graph Computing


Graph computation is a difficult problem on a large scale

Pregel: A vertex-centric BSP (Apache Giraph)

More examples of graph computations:

Pregel Locality

Pregel API

Advanced Pregel API operations

To run Pregel, many copies of the program must be started within the cluster of machines

One copy of the program becomes the master

The cluster’s nameserver uses chubby

Apache Giraph is the Open Source version of Pregel

Apache Spark - Generalizing MapReduce

Spark uses a generic data storage interface

Spark also uses amore general functional programming model which uses transformations and actions

With regards to MapReduce:

At a high level, Spark uses a DAG to produce execute jobs

The cluster manager breaks a job into tasks which then sends tasks to workers where the data live.

Worker Node

Data and RDDs

Operations on RDDs

Spark does not care how data is stored. The RDD connector determines how to read the data

RDDs are also fault tolerance and track the sequence of transformation used to create them. This enables the recomputing of lost data in the case of a failed node.

Spark Streaming

MapReduce and pregel expect static data. Spark streaming enables processing of live data streams. Data is chunked into batches and processed via the spark engine.

Content Delivery Networks

Serving web content from one location presents many problems. Scalability, reliability, and performance to name a few

We also have to deal with the flash crowd problem. What if everyone comes to your site at one time?

CDN’s cache content and service requests from multiple servers at the network edge (close to the user)


CDNs focus on content

Loading a webpage is similar to a memory hierarchhy where we have CPU Cache, RAM and Disk. To load a page browsers use a content cache, a CDN, and then finally the host’s own web server.

One function that CDNs have is load balancing by increasing the numer of servers that can be reahed for content which offloads the work from going to a single web server.



To improve internet scalability, availability and performance we can mirror servers for load balancing. This affects scalability and availability. Lastly, we can cache content and serve requests from multiple servers at the network ege which is close to the user

This reduces demand on site’s infrastructure and provides faster service to users by reducing network latency (RTT)

Some of these approaches have problems

All of these require extra capcity and capital costs.

Akamai Distributed Caching

Akamai is a company which evolved from MIT research. It aimed at tackling the “flash crowd” problem.

Delivers approximately 15-30% of all web traffic (over 30 terabits/second)

The goal of Akamai is to try to serve clients from servers which are likely to contain the necessary content. The nearest, most available, and likely server which could have the content the user is requesting.

Because the internet is a collection of many autonomous networks, connectivity is based on business decisions and peering agreements, not simply performance of the servers

Akamai’s overlay network is a collection of caching servers at many, many ISPs which all know about one another.

Akaimai Overlay Network

Other uses for CDNs

Limelight Orchestrate

A service focused on video distribution and content management

Clusters, Cryptography, Authentication, and Authorization


Computer System Design

Cryptographic Systems

Cryptography is not the same as security

Cryptography is good for:

Authentication and Authorization

Multi-factor authentication

Two password is not 2F authentication!

PAP: Password authentication protocol

dicitonary Attacks

Speed up dictionary attacks by using a table of pre-computed hashes.

Man in the Middle Attacks (MITM)

Combined Authentication and Key Exchange

There is now a secure communication channel


Distributed Systems - zac blanco