Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FUTURE][SYSTEMML-1160] [WIP] Enable prefetching of batches via for loop iterator #389

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

niketanpansare
Copy link
Contributor

@niketanpansare niketanpansare commented Feb 13, 2017

This PR extends current for construct to support iterating over batch. The current version only supports iterating over batches of rows.

The syntax

for(X_batch in X, nrow=batch_size) {
  ...
}

is equivalent to

i = 0
for(i in 1:iters) {
	i = i + 1
	# Get next batch
	beg = ((i-1) * batch_size) %% N + 1
	end = min(N, beg + batch_size - 1)
	X_batch = X[beg:end,]
}

Note: This PR does not introduce rectangular blocks in SystemML, instead handles it in the ForProgramBlock. To enable prefetching, the user just has to set a config flag:

ml.setConfigProperty("prefetch.budget.mb", "300"); // 300 mb prefetch memory

@dusenberrymw @mboehm7 @bertholdreinwald @frreiss @asurve

@niketanpansare
Copy link
Contributor Author

I will run few more experiments and update this PR soon :)

Copy link
Contributor

@dusenberrymw dusenberrymw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niketanpansare This is great! In particular, it's quite encouraging to see that we can gain a good improvement by simply introducing native support for iterating over batches, rather than having to rely on the other possible approach of explicit, external reblocking. This approach is much more in line with the vision of SystemML -- simple syntax that allows the user to focus on ML, while delivering performance transparently. As for the prefetching, I think it's much more likely that we will benefit with a larger model, as a single iteration of the softmax model is quite fast and does not leave much time to fetch data in parallel. Can you also compare this to the explicit reblocking approach just to see if we could benefit even more by some internal reblocking given the presence of this new for iterator in a script?

@niketanpansare niketanpansare force-pushed the for_iterator1 branch 4 times, most recently from 8da4f8d to e701060 Compare February 16, 2017 23:57
@niketanpansare
Copy link
Contributor Author

niketanpansare commented Feb 17, 2017

Approach 1: With Prefetching

The below statistics show that for convnet on 200K column dataset with batchsize of 50, we only wait for fetching of the first batch (~41 seconds). For remaining batches, there is almost zero-wait time. I also did sanity testing using System.out.prints and double-checked that Spark job was executed in parallel as conv2d was executing.

ml.setConfigProperty("prefetch.budget.mb", "300"); // 300 mb prefetch memory

for (e in 1:epochs) {
        i = 0
        for(X_batch in X, nrow=batch_size) {
			i = i + 1
			if(i == 100) {
			   stop ("Early stopping for benchmarks")
			}

		  # Get next batch
		  beg = ((i-1) * batch_size) %% N + 1
		  end = min(N, beg + batch_size - 1)
		  y_batch = Y[beg:end,]
	  ....
	}
}

time: 1384233.345149ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  76.
Number of executed Spark inst:  3.
Batch-Fetch (next,indx,wait,first):     41.586/355.839/41.585/41.583 sec.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/1.
Cache writes (WB, FS, HDFS):    4767/139/0.
Cache times (ACQr/m, RLS, EXP): 0.090/0.018/30.515/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/595.
HOP DAGs recompile time:        1.699 sec.
Functions recompiled:           1.
Functions recompile time:       0.164 sec.
Spark ctx create time (lazy):   0.017 sec.
Spark trans counts (par,bc,col):0/0/102.
Spark trans times (par,bc,col): 0.000/0.000/355.286 secs.
Total JIT compile time:         172.599 sec.
Total JVM GC count:             773.
Total JVM GC time:              123.818 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         817.763 sec     297
-- 2)   sel+    196.118 sec     396
-- 3)   -       45.240 sec      2178
-- 4)   conv2d_backward_filter  44.979 sec      297
-- 5)   relu_maxpooling         44.417 sec      297
-- 6)   +*      41.579 sec      1485
-- 7)   r'      28.868 sec      495
-- 8)   conv2d_backward_data    28.579 sec      198
-- 9)   maxpooling_backward     21.912 sec      297
-- 10)  *       19.874 sec      2880

Approach 2: Maxi-batch

for (e in 1:epochs) {
      maxi_batch_Size = 20*batch_size
      maxi_iters = ceil(N / maxi_batch_Size)
      stopping_i = 0
      for(maxi in 1:maxi_iters) {
          # Get next batch
          maxi_beg = ((maxi-1) * maxi_batch_Size) %% N + 1
          maxi_end = min(N, maxi_beg + maxi_batch_Size - 1)
          X_maxibatch = X[maxi_beg:maxi_end,]
          y_maxibatch = Y[maxi_beg:maxi_end,]
          maxi_N = nrow(y_maxibatch)
          iters = ceil(maxi_N / batch_size)
          for(i in 1:iters) {
            stopping_i = stopping_i + 1
            if(stopping_i == 100) {
              stop ("Early stopping for benchmarks")
            }
            # Get next batch
            beg = ((i-1) * batch_size) %% maxi_N + 1
            end = min(maxi_N, beg + batch_size - 1)
            X_batch = X_maxibatch[beg:end,]
            y_batch = y_maxibatch[beg:end,]
			....
		}
	}
}

time: 2334624.686333ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  81.
Number of executed Spark inst:  8.
Cache hits (Mem, WB, FS, HDFS): 19895/0/0/6.
Cache writes (WB, FS, HDFS):    7403/64/0.
Cache times (ACQr/m, RLS, EXP): 80.049/0.025/33.767/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/605.
HOP DAGs recompile time:        1.907 sec.
Functions recompiled:           1.
Functions recompile time:       0.303 sec.
Spark ctx create time (lazy):   0.026 sec.
Spark trans counts (par,bc,col):0/0/5.
Spark trans times (par,bc,col): 0.000/0.000/79.953 secs.
Total JIT compile time:         83.437 sec.
Total JVM GC count:             1441.
Total JVM GC time:              254.973 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         780.249 sec     297
-- 2)   conv2d_backward_filter  442.185 sec     297
-- 3)   conv2d_backward_data    304.401 sec     198
-- 4)   sel+    200.346 sec     396
-- 5)   uack+   115.980 sec     495
-- 6)   maxpooling_backward     107.120 sec     297
-- 7)   rangeReIndex    85.561 sec      203
-- 8)   relu_backward   61.133 sec      495
-- 9)   ba+*    43.830 sec      594
-- 10)  relu_maxpooling         42.892 sec      297

I used the same cluster and same configuration as our VLDB experiments.

@bertholdreinwald @mboehm7 @dusenberrymw

@dusenberrymw
Copy link
Contributor

@niketanpansare Thanks for running this experiment. A 1.7x speedup certainly shows that our current methods are leaving a large amount of performance on the table.

@mboehm7
Copy link
Contributor

mboehm7 commented Feb 18, 2017

These numbers are a bit puzzling. If there is a 1000s difference, how come that rangeReIndex (which also covers the lazy spark indexing) only requires 85s (from which the 80s for ACQ give us an upper bound on the time for collecting the indexed data)?

@niketanpansare
Copy link
Contributor Author

niketanpansare commented Feb 18, 2017 via email

@niketanpansare
Copy link
Contributor Author

Here is the statistics on a different cluster with 70g driver memory:

Approach 1: With Prefetching

time: 834467.702908ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  75.
Number of executed Spark inst:  3.
Batch-Fetch (next,indx,wait,first):     61.151/168.933/61.151/61.149 sec.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/1.
Cache writes (WB, FS, HDFS):    7506/3/0.
Cache times (ACQr/m, RLS, EXP): 0.110/0.010/3.239/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/595.
HOP DAGs recompile time:        0.985 sec.
Functions recompiled:           1.
Functions recompile time:       0.086 sec.
Spark ctx create time (lazy):   0.013 sec.
Spark trans counts (par,bc,col):0/0/102.
Spark trans times (par,bc,col): 0.000/0.000/168.039 secs.
Total JIT compile time:         54.711 sec.
Total JVM GC count:             508.
Total JVM GC time:              96.755 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         206.028 sec     297
-- 2)   conv2d_backward_filter  157.850 sec     297
-- 3)   sel+    110.459 sec     396
-- 4)   conv2d_backward_data    70.346 sec      198
-- 5)   uack+   50.411 sec      495
-- 6)   maxpooling_backward     50.044 sec      297
-- 7)   relu_backward   28.905 sec      495
-- 8)   relu_maxpooling         21.635 sec      297
-- 9)   ba+*    20.543 sec      594
-- 10)  +*      15.317 sec      1485

Approach 2: Maxi-batch

time: 915534.711838ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  80.
Number of executed Spark inst:  8.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/6.
Cache writes (WB, FS, HDFS):    7438/1/0.
Cache times (ACQr/m, RLS, EXP): 138.341/0.009/3.571/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/605.
HOP DAGs recompile time:        0.900 sec.
Functions recompiled:           1.
Functions recompile time:       0.146 sec.
Spark ctx create time (lazy):   0.013 sec.
Spark trans counts (par,bc,col):0/0/5.
Spark trans times (par,bc,col): 0.000/0.000/138.253 secs.
Total JIT compile time:         41.224 sec.
Total JVM GC count:             574.
Total JVM GC time:              102.319 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         196.187 sec     297
-- 2)   conv2d_backward_filter  160.273 sec     297
-- 3)   rangeReIndex    142.017 sec     203
-- 4)   sel+    108.674 sec     396
-- 5)   conv2d_backward_data    70.360 sec      198
-- 6)   maxpooling_backward     51.672 sec      297
-- 7)   uack+   49.394 sec      396
-- 8)   relu_backward   36.753 sec      396
-- 9)   relu_maxpooling         20.152 sec      297
-- 10)  +*      18.896 sec      1485

@mboehm7
Copy link
Contributor

mboehm7 commented Feb 21, 2017

Does this second cluster use 10Gb or 1Gb Ethernet?

@niketanpansare
Copy link
Contributor Author

10 Gb Ethernet:

$ lspci | grep -iE --color 'network|ethernet'
01:00.0 Ethernet controller: Broadcom Limited NetXtreme II BCM57810 10 Gigabit Ethernet (rev 10)
01:00.1 Ethernet controller: Broadcom Limited NetXtreme II BCM57810 10 Gigabit Ethernet (rev 10)
16:00.0 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.1 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.2 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.3 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)

@niketanpansare
Copy link
Contributor Author

@bertholdreinwald @mboehm7 @dusenberrymw Any review updates for this PR ?

Since its being a while, here is the summary of the discussion related to this PR:

  1. This PR adds a new syntax to DML language, which is useful for implementing SGD-style algorithms:
for(X_batch in X, nrow=batch_size) {
  ...
}
  1. Additionally, it allows for prefetching of X_batch with the help of the configuration property prefetch.budget.mb. The memory specified by this property is subtracted from the local memory budget. This could be a drawback in low-memory settings.

  2. Alternative is to compute the aggregate memory budget of all the child blocks and subtract it from local memory budget to get the prefetch budget. In many cases, the aggregate memory budget will likely be a gross over-estimate of actual memory required as it doesnot take into account the effect of rmvar and our caching. We can extend ProgramBlock to compute this:

/**
 * Computes the memory estimate if known, else -1.
 * If OptimizerUtils.isMemoryBasedOptLevel(), it returns OptimizerUtils.INVALID_SIZE.
 * 
 * @return the memory estimate for this program block 
 * @throws HopsException if error occurs
 */
public double getMemEstimate() throws HopsException {
	if(_sb == null) 
		return 0;
	if(!OptimizerUtils.isMemoryBasedOptLevel())
		return OptimizerUtils.INVALID_SIZE;
	
	double memBudget = 0;
	for(Hop h : _sb.get_hops()) {
		double memEstimateForH = h.getMemEstimate();
		if(memEstimateForH < 0)
			return -1;
		else
			memBudget += memEstimateForH;
	}
	return memBudget;
}

Alternatively, we can replace memBudget += h.getMemEstimate(); with memBudget = max(memBudget, memEstimateForH) as an optimistic estimate assuming immediate rmvar and/or caching.

@mboehm7
Copy link
Contributor

mboehm7 commented May 4, 2017

I'm still not convinced by this PR because the experimental results were non-conclusive (especially in comparison to simple rewrites), it does not nicely fit into our the architecture of memory management (without unnecessary over-provisioning), it unnecessarily differs from R syntax, and the CP batch iterator might actually create more evictions in the presence of memory pressure. So I would recommend to not rush this in.

@akchinSTC
Copy link
Contributor

Refer to this link for build results (access rights to CI server needed):
https://sparktc.ibmcloud.com/jenkins/job/SystemML-PullRequestBuilder/1439/

@mboehm7
Copy link
Contributor

mboehm7 commented May 5, 2017

additional things to think about: (1) handling of updates in terms of assignments or left indexing (we would need to handle this during validate), and (2) the impact on codegen, which is already able to fuse right indexing of column vectors into the subsequent fused operator.

@akchinSTC
Copy link
Contributor

Refer to this link for build results (access rights to CI server needed):
https://sparktc.ibmcloud.com/jenkins/job/SystemML-PullRequestBuilder/1725/

@j143 j143 changed the title [SYSTEMML-1160] [WIP] Enable prefetching of batches via for loop iterator [FUTURE][SYSTEMML-1160] [WIP] Enable prefetching of batches via for loop iterator Aug 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Unplanned
Development

Successfully merging this pull request may close these issues.

4 participants