Skip to content

Commit 1b16f80

Browse files
committed
Merge branch 'slxu-fix-network-backpressure' into slxu-kill-query-at-aborted-downloading
Conflicts: src/edu/washington/escience/myria/parallel/Server.java
2 parents 0466cb4 + 0493436 commit 1b16f80

File tree

11 files changed

+217
-167
lines changed

11 files changed

+217
-167
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ before_install:
1212
- sh travis/before_install.sh
1313
- export TERM=dumb
1414
after_success:
15-
- gradle jacocoTestReport coveralls
15+
- ./gradlew jacocoTestReport coveralls
1616

1717

1818

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ apply plugin: "java"
44
apply plugin: "eclipse"
55
/* Coveralls with JaCoCo */
66
apply plugin: 'jacoco'
7-
apply plugin: 'coveralls'
7+
apply plugin: 'com.github.kt3k.coveralls'
88

99
/* Set up group and version info for myria */
1010
archivesBaseName = "myria"
@@ -26,7 +26,7 @@ buildscript {
2626
}
2727

2828
dependencies {
29-
classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:0.4.0'
29+
classpath 'org.kt3k.gradle.plugin:coveralls-gradle-plugin:2.0.0'
3030
}
3131
}
3232
/* Tell gradle where the source code is located. */

src/edu/washington/escience/myria/coordinator/catalog/WorkerCatalog.java

+33-34
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222

2323
/**
2424
* This class is intended to store the configuration and catalog information for a Myria worker.
25-
*
26-
*
25+
*
26+
*
2727
*/
2828
public final class WorkerCatalog {
2929
/** The logger for this class. */
@@ -33,7 +33,7 @@ public final class WorkerCatalog {
3333
* @param filename the path to the SQLite database storing the worker catalog.
3434
* @return a fresh WorkerCatalog fitting the specified description.
3535
* @throws CatalogException if there is an error creating the database or the file already exists.
36-
*
36+
*
3737
* TODO add some sanity checks to the filename?
3838
*/
3939
public static WorkerCatalog create(final String filename) throws CatalogException {
@@ -46,7 +46,7 @@ public static WorkerCatalog create(final String filename) throws CatalogExceptio
4646
* @param overwrite specifies whether to overwrite an existing WorkerCatalog.
4747
* @return a fresh WorkerCatalog fitting the specified description.
4848
* @throws CatalogException if the database already exists, or there is an error creating it.
49-
*
49+
*
5050
* TODO add some sanity checks to the filename?
5151
*/
5252
public static WorkerCatalog create(final String filename, final boolean overwrite) throws CatalogException {
@@ -61,12 +61,12 @@ public static WorkerCatalog create(final String filename, final boolean overwrit
6161
}
6262

6363
/**
64-
*
64+
*
6565
* @param catalogFile a File object pointing to the SQLite database that will store the WorkerCatalog. If catalogFile
6666
* is null, this creates an in-memory SQLite database.
6767
* @return a fresh WorkerCatalog fitting the specified description.
6868
* @throws CatalogException if there is an error opening the database.
69-
*
69+
*
7070
* TODO add some sanity checks to the filename?
7171
*/
7272
private static WorkerCatalog createFromFile(final File catalogFile) throws CatalogException {
@@ -89,36 +89,36 @@ private static WorkerCatalog createFromFile(final File catalogFile) throws Catal
8989
sqliteConnection.exec("DROP TABLE IF EXISTS configuration");
9090
sqliteConnection.exec(
9191
"CREATE TABLE configuration (\n"
92-
+ " key STRING UNIQUE NOT NULL,\n"
93-
+ " value STRING NOT NULL);");
92+
+ " key STRING UNIQUE NOT NULL,\n"
93+
+ " value STRING NOT NULL);");
9494
sqliteConnection.exec("DROP TABLE IF EXISTS masters");
9595
sqliteConnection.exec(
9696
"CREATE TABLE masters (\n"
97-
+ " master_id INTEGER PRIMARY KEY ASC,\n"
98-
+ " host_port STRING NOT NULL);");
97+
+ " master_id INTEGER PRIMARY KEY ASC,\n"
98+
+ " host_port STRING NOT NULL);");
9999
sqliteConnection.exec("DROP TABLE IF EXISTS workers");
100100
sqliteConnection.exec(
101101
"CREATE TABLE workers (\n"
102-
+ " worker_id INTEGER PRIMARY KEY ASC,\n"
103-
+ " host_port STRING NOT NULL);");
102+
+ " worker_id INTEGER PRIMARY KEY ASC,\n"
103+
+ " host_port STRING NOT NULL);");
104104
sqliteConnection.exec("DROP TABLE IF EXISTS relations");
105105
sqliteConnection.exec(
106106
"CREATE TABLE relations (\n"
107-
+ " relation_id INTEGER PRIMARY KEY ASC,\n"
108-
+ " relation_name STRING NOT NULL UNIQUE);");
107+
+ " relation_id INTEGER PRIMARY KEY ASC,\n"
108+
+ " relation_name STRING NOT NULL UNIQUE);");
109109
sqliteConnection.exec("DROP TABLE IF EXISTS relation_schema");
110110
sqliteConnection.exec(
111111
"CREATE TABLE relation_schema (\n"
112-
+ " relation_id INTEGER NOT NULL REFERENCES relations(relation_id),\n"
113-
+ " col_index INTEGER NOT NULL,\n"
114-
+ " col_name STRING,\n"
115-
+ " col_type STRING NOT NULL);");
112+
+ " relation_id INTEGER NOT NULL REFERENCES relations(relation_id),\n"
113+
+ " col_index INTEGER NOT NULL,\n"
114+
+ " col_name STRING,\n"
115+
+ " col_type STRING NOT NULL);");
116116
sqliteConnection.exec("DROP TABLE IF EXISTS shards");
117117
sqliteConnection.exec(
118118
"CREATE TABLE shards (\n"
119-
+ " stored_relation_id INTEGER NOT NULL REFERENCES stored_relations(stored_relation_id),\n"
120-
+ " shard_index INTEGER NOT NULL,\n"
121-
+ " location STRING NOT NULL);");
119+
+ " stored_relation_id INTEGER NOT NULL REFERENCES stored_relations(stored_relation_id),\n"
120+
+ " shard_index INTEGER NOT NULL,\n"
121+
+ " location STRING NOT NULL);");
122122
sqliteConnection.exec("COMMIT TRANSACTION");
123123
/* @formatter:on*/
124124
} catch (final SQLiteException e) {
@@ -147,7 +147,7 @@ private static WorkerCatalog createFromFile(final File catalogFile) throws Catal
147147
/**
148148
* @return a fresh WorkerCatalog fitting the specified description.
149149
* @throws CatalogException if there is an error opening the database.
150-
*
150+
*
151151
* TODO add some sanity checks to the filename?
152152
*/
153153
public static WorkerCatalog createInMemory() throws CatalogException {
@@ -156,12 +156,12 @@ public static WorkerCatalog createInMemory() throws CatalogException {
156156

157157
/**
158158
* Opens the worker catalog stored as a SQLite database in the specified file.
159-
*
159+
*
160160
* @param filename the path to the SQLite database storing the catalog.
161161
* @return an initialized WorkerCatalog object ready to be used for experiments.
162162
* @throws FileNotFoundException if the given file does not exist.
163163
* @throws CatalogException if there is an error connecting to the database.
164-
*
164+
*
165165
* TODO add some sanity checks to the filename?
166166
*/
167167
public static WorkerCatalog open(final String filename) throws FileNotFoundException, CatalogException {
@@ -195,7 +195,7 @@ public static WorkerCatalog open(final String filename) throws FileNotFoundExcep
195195

196196
/**
197197
* Not publicly accessible.
198-
*
198+
*
199199
* @param sqliteConnection connection to the SQLite database that stores the WorkerCatalog.
200200
* @throws SQLiteException if there is an error turning on foreign keys.
201201
*/
@@ -208,7 +208,7 @@ private WorkerCatalog(final SQLiteConnection sqliteConnection) throws SQLiteExce
208208

209209
/**
210210
* Adds a master using the specified host and port to the WorkerCatalog.
211-
*
211+
*
212212
* @param hostPortString specifies the path to the master in the format "host:port"
213213
* @return this WorkerCatalog
214214
* @throws CatalogException if the hostPortString is invalid or there is a database exception.
@@ -237,13 +237,13 @@ public WorkerCatalog addMaster(final String hostPortString) throws CatalogExcept
237237

238238
/**
239239
* Adds the metadata for a relation into the WorkerCatalog.
240-
*
240+
*
241241
* @param name the name of the relation.
242242
* @param schema the schema of the relation.
243243
* @throws CatalogException if the relation is already in the WorkerCatalog or there is an error in the database.
244-
*
244+
*
245245
* TODO if we use SQLite in a multithreaded way this will need to be revisited.
246-
*
246+
*
247247
*/
248248
public void addRelationMetadata(final String name, final Schema schema) throws CatalogException {
249249
Objects.requireNonNull(name);
@@ -291,7 +291,7 @@ public void addRelationMetadata(final String name, final Schema schema) throws C
291291

292292
/**
293293
* Adds a worker using the specified host and port to the WorkerCatalog.
294-
*
294+
*
295295
* @param workerId specifies the global identifier of this worker.
296296
* @param hostPortString specifies the path to the worker in the format "host:port".
297297
* @return this WorkerCatalog.
@@ -333,7 +333,6 @@ public WorkerCatalog addWorkers(final Map<String, String> id2HostPortString) thr
333333
throw new CatalogException("WorkerCatalog is closed.");
334334
}
335335
try {
336-
/* Just used to verify that hostPortString is legal */
337336
final SQLiteStatement statement =
338337
sqliteConnection.prepare("INSERT INTO workers(worker_id, host_port) VALUES(?,?);", false);
339338
sqliteConnection.exec("BEGIN TRANSACTION");
@@ -379,7 +378,7 @@ public void close() {
379378
/**
380379
* Extract the value of a particular configuration parameter from the database. Returns null if the parameter is not
381380
* configured.
382-
*
381+
*
383382
* @param key the name of the configuration parameter.
384383
* @return the value of the configuration parameter, or null if that configuration is not supported.
385384
* @throws CatalogException if there is an error in the backing database.
@@ -489,7 +488,7 @@ public Map<Integer, SocketInfo> getWorkers() throws CatalogException {
489488
/**
490489
* Extract the value of a particular configuration parameter from the database. Returns null if the parameter is not
491490
* configured.
492-
*
491+
*
493492
* @param key the name of the configuration parameter.
494493
* @param value the value of the configuration parameter.
495494
* @throws CatalogException if there is an error in the backing database.
@@ -515,7 +514,7 @@ public void setConfigurationValue(final String key, final String value) throws C
515514

516515
/**
517516
* Set all the configuration values in the provided map in a single transaction.
518-
*
517+
*
519518
* @param entries the value of the configuration parameter.
520519
* @throws CatalogException if there is an error in the backing database.
521520
*/

src/edu/washington/escience/myria/expression/CastExpression.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public CastExpression(final ExpressionOperator left, final ExpressionOperator ri
9292
public Type getOutputType(final ExpressionOperatorParameter parameters) {
9393
final Type castFrom = getLeft().getOutputType(parameters);
9494
final Type castTo = getRight().getOutputType(parameters);
95-
Preconditions.checkArgument(getCastType(castFrom, castTo) != CastType.UNSUPPORTED,
95+
Preconditions.checkArgument((castFrom == castTo) || getCastType(castFrom, castTo) != CastType.UNSUPPORTED,
9696
"Cast from %s to %s is not supported.", castFrom, castTo);
9797
return castTo;
9898
}

src/edu/washington/escience/myria/operator/agg/MultiGroupByAggregate.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,10 @@ private TupleBatch getResultBatch() throws DbException {
212212
TupleBatchBuffer curGroupAggs = new TupleBatchBuffer(aggSchema);
213213
for (int row = 0; row < curGroupKeys.numTuples(); ++row) {
214214
Aggregator[] rowAggs = groupAggs.get(row);
215-
for (int i = 0; i < rowAggs.length; ++i) {
216-
rowAggs[i].getResult(curGroupAggs, i);
215+
int curCol = 0;
216+
for (Aggregator rowAgg : rowAggs) {
217+
rowAgg.getResult(curGroupAggs, curCol);
218+
curCol += rowAgg.getResultSchema().numColumns();
217219
}
218220
}
219221
TupleBatch aggResults = curGroupAggs.popAny();
@@ -252,10 +254,8 @@ public Schema generateSchema() {
252254

253255
for (AggregatorFactory f : factories) {
254256
Schema curAggSchema = f.getResultSchema(inputSchema);
255-
Preconditions.checkState(curAggSchema.numColumns() == 1, "aggSchema should only have 1 column, not %s",
256-
curAggSchema.numColumns());
257-
aggTypes.add(curAggSchema.getColumnType(0));
258-
aggNames.add(curAggSchema.getColumnName(0));
257+
aggTypes.addAll(curAggSchema.getColumnTypes());
258+
aggNames.addAll(curAggSchema.getColumnNames());
259259
}
260260
aggSchema = new Schema(aggTypes, aggNames);
261261
return Schema.merge(groupSchema, aggSchema);

0 commit comments

Comments
 (0)