|
| 1 | +# Partitions and Executors in MapReduce and Beyond |
| 2 | + |
| 3 | + Author: Mahmoud Parsian |
| 4 | + Last updated: 9/28/2022 |
| 5 | + |
| 6 | +## Introduction to MapReduce and Beyond |
| 7 | + |
| 8 | +MapReduce is a parallel programming model |
| 9 | +and an associated implementation introduced |
| 10 | +by Google. In the programming model, a user |
| 11 | +specifies the computation by two functions, |
| 12 | +`map()` and `reduce()`. MapReduce paradigm |
| 13 | +is implemented by many systems: |
| 14 | + |
| 15 | +* Google MapReduce (proprietary, not open sourced) |
| 16 | +* Apache Hadoop (open source) |
| 17 | +* Apache Tez (open source) |
| 18 | +* Apache Spark (open source) |
| 19 | + * Spark is a superset of MapReduce and does |
| 20 | + every analytics much better than Hadoop. |
| 21 | + Spark supports superset of `map()` and |
| 22 | + `reduce()` functions |
| 23 | + * Spark utilizes RAM/memory but Hadoop is |
| 24 | + mostly disk based. |
| 25 | + * Spark is a multi-language engine for executing |
| 26 | + data engineering, data science, and machine |
| 27 | + learning on single-node machines or clusters. |
| 28 | + * Spark is preferred over Hadoop (Spark can be |
| 29 | + 10x to 100x times faster than Hadoop) |
| 30 | + |
| 31 | + |
| 32 | +## Parallelism in MapReduce-based Systems |
| 33 | + |
| 34 | +The MapReduce programming model is created |
| 35 | +for processing data which requires "DATA |
| 36 | +PARALLELISM", the ability to compute multiple |
| 37 | +independent operations in any order. After the |
| 38 | +MapReduce system receives a job, it first divides |
| 39 | +all the input data of the job into several data |
| 40 | +blocks of equal size (partitions data into smaller |
| 41 | +chunks called partitions). For example, each `map()` |
| 42 | +task is responsible for processing a data block (a |
| 43 | +partition). All `map()` tasks (mappers) are executed |
| 44 | +at the same time independently, forming parallel |
| 45 | +processing of data. Also, all `reduce()` tasks |
| 46 | +(reducers) are executed at the same time independently, |
| 47 | +forming parallel processing of reducers. Therefore, |
| 48 | +MapReduce allows parallel and distributed processing. |
| 49 | +Each feature selector can be divided into subtasks and |
| 50 | +the subtasks can then be processed in parallel. |
| 51 | + |
| 52 | +Note that Apache Spark is a superset of MapReduce |
| 53 | +and beyond. When a task is parallelized in Spark, |
| 54 | +it means that concurrent tasks may be running on the |
| 55 | +driver node or worker nodes. How the task is split |
| 56 | +across these different nodes in the cluster depends |
| 57 | +on the types of data structures and libraries that |
| 58 | +you're using. |
| 59 | + |
| 60 | +The concept of data partitioning semi-equally applies |
| 61 | +to MapReduce and Spark systems. |
| 62 | + |
| 63 | + |
| 64 | +## Input: |
| 65 | + |
| 66 | +Your input is partitioned into chunks called partitions. |
| 67 | +For example, if you have `80,000,000,000` records (data points) |
| 68 | +and you partition it into `40,000` chunks: then |
| 69 | + |
| 70 | +* The number of partitions: `40,000` |
| 71 | +* Approximate number of elements per partition: `2,000,000` |
| 72 | +* `40,000 x 2,000,000 = 80,000,000,000` |
| 73 | +* Let's label partitions as `P_1`, `P_2`, ..., `P_40000` |
| 74 | + |
| 75 | + |
| 76 | +## Cluster: |
| 77 | + |
| 78 | +Now assume that you have a cluster of 4 nodes: |
| 79 | +one master (denoted by M), and 3 working nodes |
| 80 | +(W1, W2, W3). Therefore our example cluster is |
| 81 | +denoted as: `C = {M, W1, W2, W3}`. Also, note that |
| 82 | +here we assume that the master node acts as a |
| 83 | +cluster manager and does not execute any |
| 84 | +transformation (such as mappers, filters, reducers, |
| 85 | +...). Basically, we assume that master node is a |
| 86 | +cluster manager: manage the cluster activities and |
| 87 | +functionalities. Further assume that each worker |
| 88 | +node can have 4 executors (therefore the total number |
| 89 | +of available executors will be 12 = 3 x 4). The number |
| 90 | +of executors depends on the size and power of a worker |
| 91 | +node: if you have realy a powerful (lots of RAM and |
| 92 | +CPU) worker node, then it even be able to handle 10 |
| 93 | +to 16 executors. Therefore, the number of executors |
| 94 | +in our cluster is `12 (3 x 4)`. We denote our assumed |
| 95 | +cluster as `C = {M, W1, W2, W3}`. |
| 96 | + |
| 97 | +## Distributing Partitions to Worker Nodes |
| 98 | + |
| 99 | +The question is that how the cluster manager |
| 100 | +will distribute and execute 40,000 partitions |
| 101 | +among 3 worker nodes (in our example, we have |
| 102 | +4 executors per worker node -- each executor |
| 103 | +can execute a mapper, filter, or reducer). Let's |
| 104 | +assume that these `40,000` partitions are queued |
| 105 | +to be processed by the cluster manager. Let's |
| 106 | +label our executors as: |
| 107 | + |
| 108 | + E: {E_1, E_2, E_3, ..., E_12} |
| 109 | + |
| 110 | +Lets's say that the first transformation is a |
| 111 | +mapper (i.e., a `map()` function) to be executed |
| 112 | +by a `map()` function (basically a `map()` function |
| 113 | +receives a single partition and then emits a set |
| 114 | +of (key, value) pairs). The first iteration: 12 |
| 115 | +partitions are assigned to E (i.e., the 12 executors, |
| 116 | +each executor gets a single partition and executes |
| 117 | +`map()` function on that given partition). When an |
| 118 | +executor finishes the execution of map(single_partition), |
| 119 | +then it sends the result to the cluster manager and |
| 120 | +then cluster manager assigns another partition from |
| 121 | +a queue. This process continues until we exhaust all |
| 122 | +partitions. Therefore at any one time, 12 executors |
| 123 | +are working (executing the `map()` function) in parallel |
| 124 | +and independently. The more worker nodes we have, the |
| 125 | +faster we will execute the whole thing. |
| 126 | + |
| 127 | +Given our initial cluster as C, for example if it |
| 128 | +takes T seconds to complete the execution of 40,000 |
| 129 | +partitions with 3 worker nodes `{W1, W2, W3}`, then |
| 130 | + |
| 131 | +* By adding an additional 3 worker nodes `{W4, W5, W6}` |
| 132 | +will reduce the execution time by about `T/2`. |
| 133 | + |
| 134 | +* If we increase the number of worker nodes to 9 (one |
| 135 | +master node and 9 worker nodes), then the elapsed time |
| 136 | +will be reduced to about `T/3`. |
| 137 | + |
| 138 | + |
| 139 | + |
0 commit comments