Skip to content

Commit 6b57753

Browse files
committed
Merge remote-tracking branch 'origin/master' into domoritz-visualization
Conflicts: conf/log4j.properties src/edu/washington/escience/myria/api/QueryResource.java src/edu/washington/escience/myria/coordinator/catalog/MasterCatalog.java src/edu/washington/escience/myria/operator/Operator.java src/edu/washington/escience/myria/parallel/QuerySubTreeTask.java src/edu/washington/escience/myria/parallel/Server.java
2 parents 178fa9b + 1c9926d commit 6b57753

File tree

92 files changed

+975
-1408
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+975
-1408
lines changed

conf/log4j.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ log4j.appender.stderr.Threshold=WARN
1818
#
1919
log4j.appender.A1.Append=true
2020
log4j.appender.A1.File=englink-log4j.log
21-
log4j.appender.A1.Threshold=INFO
21+
log4j.appender.A1.Threshold=DEBUG
2222
log4j.appender.A1=org.apache.log4j.FileAppender
2323
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
2424
log4j.appender.A1.layout.ConversionPattern=%-5p %d [%t] %C{1} - %m%n
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
doit.sed

src/edu/washington/escience/myria/accessmethod/JdbcAccessMethod.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public void setReadOnly(final Boolean readOnly) throws DbException {
106106
@Override
107107
public void tupleBatchInsert(final RelationKey relationKey, final Schema schema, final TupleBatch tupleBatch)
108108
throws DbException {
109+
LOGGER.debug("Inserting batch of size {}", tupleBatch.numTuples());
109110
Objects.requireNonNull(jdbcConnection);
110111
if (jdbcInfo.getDbms().equals(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL)) {
111112
// Use the postgres COPY command which is much faster
@@ -118,8 +119,11 @@ public void tupleBatchInsert(final RelationKey relationKey, final Schema schema,
118119
tw.done();
119120

120121
Reader reader = new InputStreamReader(new ByteArrayInputStream(baos.toByteArray()));
121-
cpManager.copyIn("COPY " + relationKey.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL)
122-
+ " FROM STDIN WITH CSV", reader);
122+
long inserted =
123+
cpManager.copyIn("COPY " + relationKey.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL)
124+
+ " FROM STDIN WITH CSV", reader);
125+
Preconditions.checkState(inserted == tupleBatch.numTuples(),
126+
"Error: inserted a batch of size %s but only actually inserted %s rows", tupleBatch.numTuples(), inserted);
123127
} catch (final SQLException | IOException e) {
124128
LOGGER.error(e.getMessage(), e);
125129
throw new DbException(e);
@@ -139,6 +143,7 @@ public void tupleBatchInsert(final RelationKey relationKey, final Schema schema,
139143
throw new DbException(e);
140144
}
141145
}
146+
LOGGER.debug(".. done inserting batch of size {}", tupleBatch.numTuples());
142147
}
143148

144149
@Override
@@ -209,6 +214,7 @@ public void init() throws DbException {
209214
@Override
210215
public void execute(final String ddlCommand) throws DbException {
211216
Objects.requireNonNull(jdbcConnection);
217+
LOGGER.debug("Executing command {}", ddlCommand);
212218
Statement statement;
213219
try {
214220
statement = jdbcConnection.createStatement();
@@ -549,7 +555,6 @@ private TupleBatch getNextTB() throws SQLException {
549555
return new TupleBatch(schema, columns, numTuples);
550556
} else {
551557
return null;
552-
553558
}
554559
}
555560

Original file line numberDiff line numberDiff line change
@@ -1,12 +1,41 @@
11
package edu.washington.escience.myria.api.encoding;
22

3+
import java.util.List;
4+
import java.util.Set;
5+
36
import edu.washington.escience.myria.parallel.Consumer;
7+
import edu.washington.escience.myria.parallel.ExchangePairID;
48

5-
public abstract class AbstractConsumerEncoding<C extends Consumer> extends ExchangeEncoding<C> {
9+
public abstract class AbstractConsumerEncoding<C extends Consumer> extends LeafOperatorEncoding<C> implements
10+
ExchangeEncoding {
11+
@Required
612
public String argOperatorId;
713

814
String getOperatorId() {
915
return argOperatorId;
1016
}
1117

18+
private Set<Integer> realWorkerIds;
19+
private List<ExchangePairID> realOperatorIds;
20+
21+
@Override
22+
public final Set<Integer> getRealWorkerIds() {
23+
return realWorkerIds;
24+
}
25+
26+
@Override
27+
public final void setRealWorkerIds(Set<Integer> w) {
28+
realWorkerIds = w;
29+
}
30+
31+
@Override
32+
public final List<ExchangePairID> getRealOperatorIds() {
33+
return realOperatorIds;
34+
}
35+
36+
@Override
37+
public final void setRealOperatorIds(List<ExchangePairID> operatorIds) {
38+
realOperatorIds = operatorIds;
39+
}
40+
1241
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,33 @@
11
package edu.washington.escience.myria.api.encoding;
22

3+
import java.util.List;
4+
import java.util.Set;
5+
6+
import edu.washington.escience.myria.parallel.ExchangePairID;
37
import edu.washington.escience.myria.parallel.Producer;
48

5-
public abstract class AbstractProducerEncoding<P extends Producer> extends ExchangeEncoding<P> {
9+
public abstract class AbstractProducerEncoding<P extends Producer> extends UnaryOperatorEncoding<P> implements
10+
ExchangeEncoding {
11+
private Set<Integer> realWorkerIds;
12+
private List<ExchangePairID> realOperatorIds;
13+
14+
@Override
15+
public final Set<Integer> getRealWorkerIds() {
16+
return realWorkerIds;
17+
}
18+
19+
@Override
20+
public final void setRealWorkerIds(Set<Integer> w) {
21+
realWorkerIds = w;
22+
}
23+
24+
@Override
25+
public final List<ExchangePairID> getRealOperatorIds() {
26+
return realOperatorIds;
27+
}
28+
29+
@Override
30+
public final void setRealOperatorIds(List<ExchangePairID> operatorIds) {
31+
realOperatorIds = operatorIds;
32+
}
633
}

src/edu/washington/escience/myria/api/encoding/AggregateEncoding.java

+3-17
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,23 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5-
import java.util.Map;
65

7-
import com.google.common.collect.ImmutableList;
8-
9-
import edu.washington.escience.myria.operator.Operator;
106
import edu.washington.escience.myria.operator.agg.Aggregate;
117
import edu.washington.escience.myria.operator.agg.Aggregator;
128
import edu.washington.escience.myria.parallel.Server;
139

14-
public class AggregateEncoding extends OperatorEncoding<Aggregate> {
10+
public class AggregateEncoding extends UnaryOperatorEncoding<Aggregate> {
11+
@Required
1512
public List<List<String>> argAggOperators;
13+
@Required
1614
public int[] argAggFields;
17-
public String argChild;
18-
private static final List<String> requiredFields = ImmutableList.of("argChild", "argAggOperators", "argAggFields");
1915

2016
@Override
2117
public Aggregate construct(Server server) {
2218
int[] ops = deserializeAggregateOperator(argAggOperators);
2319
return new Aggregate(null, argAggFields, ops);
2420
}
2521

26-
@Override
27-
public void connect(Operator current, Map<String, Operator> operators) {
28-
current.setChildren(new Operator[] { operators.get(argChild) });
29-
}
30-
31-
@Override
32-
protected List<String> getRequiredArguments() {
33-
return requiredFields;
34-
}
35-
3622
/**
3723
* Deserializes Aggregate Operators.
3824
*
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,19 @@
11
package edu.washington.escience.myria.api.encoding;
22

33
import java.util.List;
4-
import java.util.Map;
5-
6-
import com.google.common.collect.ImmutableList;
74

85
import edu.washington.escience.myria.expression.Expression;
96
import edu.washington.escience.myria.operator.Apply;
10-
import edu.washington.escience.myria.operator.Operator;
117
import edu.washington.escience.myria.parallel.Server;
128

13-
public class ApplyEncoding extends OperatorEncoding<Apply> {
14-
15-
public String argChild;
9+
public class ApplyEncoding extends UnaryOperatorEncoding<Apply> {
1610

11+
@Required
1712
public List<Expression> emitExpressions;
1813

19-
private static final ImmutableList<String> requiredArguments = ImmutableList.of("argChild", "emitExpressions");
20-
2114
@Override
2215
public Apply construct(Server server) {
2316
return new Apply(null, emitExpressions);
2417
}
2518

26-
@Override
27-
public void connect(Operator current, Map<String, Operator> operators) {
28-
current.setChildren(new Operator[] { operators.get(argChild) });
29-
}
30-
31-
@Override
32-
protected List<String> getRequiredArguments() {
33-
return requiredArguments;
34-
}
3519
}
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
package edu.washington.escience.myria.api.encoding;
22

3-
import java.util.List;
4-
import java.util.Map;
5-
6-
import com.google.common.collect.ImmutableList;
7-
83
import edu.washington.escience.myria.Schema;
94
import edu.washington.escience.myria.operator.BinaryFileScan;
10-
import edu.washington.escience.myria.operator.Operator;
115
import edu.washington.escience.myria.parallel.Server;
126

13-
public class BinaryFileScanEncoding extends OperatorEncoding<BinaryFileScan> {
7+
public class BinaryFileScanEncoding extends LeafOperatorEncoding<BinaryFileScan> {
8+
@Required
149
public Schema schema;
10+
@Required
1511
public String fileName;
1612
public Boolean isLittleEndian;
17-
private static final List<String> requiredArguments = ImmutableList.of("schema", "fileName");
1813

1914
@Override
2015
public BinaryFileScan construct(final Server server) {
@@ -25,13 +20,4 @@ public BinaryFileScan construct(final Server server) {
2520
}
2621
}
2722

28-
@Override
29-
public void connect(Operator current, Map<String, Operator> operators) {
30-
/* Do nothing; no children. */
31-
}
32-
33-
@Override
34-
protected List<String> getRequiredArguments() {
35-
return requiredArguments;
36-
}
3723
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package edu.washington.escience.myria.api.encoding;
2+
3+
import java.util.Map;
4+
5+
import edu.washington.escience.myria.operator.Operator;
6+
7+
/**
8+
* A JSON-able wrapper for the expected wire message for an operator. To add a new operator, three things need to be
9+
* done.
10+
*
11+
* 1. Create an Encoding class that extends OperatorEncoding.
12+
*
13+
* 2. Add the operator to the list of (alphabetically sorted) JsonSubTypes below.
14+
*/
15+
public abstract class BinaryOperatorEncoding<T extends Operator> extends OperatorEncoding<T> {
16+
17+
@Required
18+
public String argChild1;
19+
20+
@Required
21+
public String argChild2;
22+
23+
@Override
24+
public final void connect(Operator current, Map<String, Operator> operators) {
25+
current.setChildren(new Operator[] { operators.get(argChild1), operators.get(argChild2) });
26+
}
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package edu.washington.escience.myria.api.encoding;
22

3-
import java.util.List;
4-
import java.util.Map;
5-
6-
import com.google.common.collect.ImmutableList;
7-
8-
import edu.washington.escience.myria.operator.Operator;
93
import edu.washington.escience.myria.parallel.GenericShuffleConsumer;
104
import edu.washington.escience.myria.parallel.Server;
115
import edu.washington.escience.myria.util.MyriaUtils;
@@ -19,23 +13,10 @@
1913
*/
2014
public class BroadcastConsumerEncoding extends AbstractConsumerEncoding<GenericShuffleConsumer> {
2115

22-
private static final List<String> requiredArguments = ImmutableList.of("argOperatorId");
23-
24-
@Override
25-
public void connect(final Operator current, final Map<String, Operator> operators) {
26-
/* Do nothing; no children. */
27-
28-
}
29-
3016
@Override
3117
public GenericShuffleConsumer construct(Server server) {
3218
return new GenericShuffleConsumer(null, MyriaUtils.getSingleElement(getRealOperatorIds()), MyriaUtils
3319
.integerCollectionToIntArray(getRealWorkerIds()));
3420
}
3521

36-
@Override
37-
protected List<String> getRequiredArguments() {
38-
return requiredArguments;
39-
}
40-
4122
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package edu.washington.escience.myria.api.encoding;
22

3-
import java.util.List;
4-
import java.util.Map;
5-
6-
import com.google.common.collect.ImmutableList;
7-
8-
import edu.washington.escience.myria.operator.Operator;
93
import edu.washington.escience.myria.parallel.FixValuePartitionFunction;
104
import edu.washington.escience.myria.parallel.GenericShuffleProducer;
115
import edu.washington.escience.myria.parallel.Server;
@@ -20,15 +14,6 @@
2014
*/
2115
public class BroadcastProducerEncoding extends AbstractProducerEncoding<GenericShuffleProducer> {
2216

23-
public String argChild;
24-
25-
private static final List<String> requiredArguments = ImmutableList.of("argChild");
26-
27-
@Override
28-
public void connect(final Operator current, final Map<String, Operator> operators) {
29-
current.setChildren(new Operator[] { operators.get(argChild) });
30-
}
31-
3217
@Override
3318
public GenericShuffleProducer construct(Server server) {
3419
int[][] cellPartition = new int[1][];
@@ -41,9 +26,4 @@ public GenericShuffleProducer construct(Server server) {
4126
MyriaUtils.integerCollectionToIntArray(getRealWorkerIds()), new FixValuePartitionFunction(0));
4227
}
4328

44-
@Override
45-
protected List<String> getRequiredArguments() {
46-
return requiredArguments;
47-
}
48-
4929
}
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,15 @@
11
package edu.washington.escience.myria.api.encoding;
22

3-
import java.util.List;
4-
import java.util.Map;
5-
6-
import com.google.common.collect.ImmutableList;
7-
8-
import edu.washington.escience.myria.operator.Operator;
93
import edu.washington.escience.myria.parallel.CollectConsumer;
104
import edu.washington.escience.myria.parallel.Server;
115
import edu.washington.escience.myria.util.MyriaUtils;
126

137
public class CollectConsumerEncoding extends AbstractConsumerEncoding<CollectConsumer> {
14-
private static final List<String> requiredArguments = ImmutableList.of("argOperatorId");
15-
16-
@Override
17-
public void connect(final Operator current, final Map<String, Operator> operators) {
18-
/* Do nothing. */
19-
}
208

219
@Override
2210
public CollectConsumer construct(Server server) {
2311
return new CollectConsumer(null, MyriaUtils.getSingleElement(getRealOperatorIds()), MyriaUtils
2412
.integerCollectionToIntArray(getRealWorkerIds()));
2513
}
2614

27-
@Override
28-
protected List<String> getRequiredArguments() {
29-
return requiredArguments;
30-
}
31-
3215
}

0 commit comments

Comments
 (0)