Skip to content
This repository was archived by the owner on Jun 15, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 193 additions & 73 deletions src/main/java/org/tango/jhdb/PostgreSQLSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.List;

Expand Down Expand Up @@ -327,35 +328,20 @@ HdbDataSet getDataFromDB(SignalInfo sigInfo,

// Fetch data
PreparedStatement statement;
Map<HdbData.Aggregate, Integer> agg_idxes = new HashMap<>();
if(!prepQueries.containsKey(sigInfo)) {
if(isAggregate) {
switch (sigInfo.dataType) {
case DOUBLE:
case FLOAT:
query = "SELECT data_time, count_rows, count_errors, count_r, count_nan_r, mean_r, min_r, max_r, stddev_r" +
", count_w, count_nan_w, mean_w, min_w, max_w, stddev_w" +
" FROM " + tablename +
" WHERE att_conf_id= ?" +
" AND data_time>= ?" +
" AND data_time<= ?" +
" ORDER BY data_time ASC";
break;
case LONG:
case ULONG:
case LONG64:
case ULONG64:
case SHORT:
case USHORT:
query = "SELECT data_time, count_rows, count_errors, count_r, mean_r, min_r, max_r, stddev_r" +
", count_w, mean_w, min_w, max_w, stddev_w" +
" FROM " + tablename +
" WHERE att_conf_id= ?" +
" AND data_time>= ?" +
" AND data_time<= ?" +
" ORDER BY data_time ASC";
break;
default:
throw new HdbFailed("Aggregates are not supported for type: " + sigInfo.dataType);
String agg_query = sigInfo.getAggregateQueryList(agg_idxes);
query = "SELECT data_time, " + agg_query +
" FROM " + tablename +
" WHERE att_conf_id= ?" +
" AND data_time>= ?" +
" AND data_time<= ?" +
" ORDER BY data_time ASC";
try {
statement = connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
} catch (SQLException e) {
throw new HdbFailed("An error occurred upon query preparation for query: " + query);
}
}
else
Expand All @@ -368,20 +354,23 @@ HdbDataSet getDataFromDB(SignalInfo sigInfo,
" AND data_time>= ?" +
" AND data_time<= ?" +
" ORDER BY data_time ASC";
try {
prepQueries.put(sigInfo, connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
} catch (SQLException e) {
throw new HdbFailed("An error occurred upon query preparation for query: " + query);
}
statement = prepQueries.get(sigInfo);
}

try {
prepQueries.put(sigInfo, connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY));
} catch (SQLException e) {
throw new HdbFailed("An error occurred upon query preparation for query: " + query);
}
}
else
{
//retrieve prepared statement
statement = prepQueries.get(sigInfo);
}

ArrayList<HdbData> ret = new ArrayList<>();


//retrieve prepared statement
statement = prepQueries.get(sigInfo);
try {

//fill the placeholders
Expand All @@ -399,7 +388,9 @@ HdbDataSet getDataFromDB(SignalInfo sigInfo,

if(isAggregate)
{
extractAggregateData(rs, sigInfo, isRW, isWO, queryCount, ret);
extractAggregateData(rs, sigInfo, isRW, isWO, queryCount, agg_idxes, ret);
// For aggregates we do not cache the statement, so we have to close it.
statement.close();
}
else
{
Expand Down Expand Up @@ -499,7 +490,7 @@ private void extractRawData(ResultSet rs, SignalInfo sigInfo, boolean isRW, bool
}
}

private void extractAggregateData(ResultSet rs, SignalInfo info, boolean isRW, boolean isWO, int queryCount, List<HdbData> data) throws SQLException, HdbFailed
private void extractAggregateData(ResultSet rs, SignalInfo info, boolean isRW, boolean isWO, int queryCount, Map<HdbData.Aggregate, Integer> indexes, List<HdbData> data) throws SQLException, HdbFailed
{
int nbRow = 0;
boolean isFloating = info.dataType == HdbSigInfo.Type.DOUBLE || info.dataType == HdbSigInfo.Type.FLOAT;
Expand All @@ -513,8 +504,8 @@ private void extractAggregateData(ResultSet rs, SignalInfo info, boolean isRW, b
}

long dTime = 0;
long count_rows;
long count_errors;
long count_rows = 0;
long count_errors = 0;

while (rs.next()) {
ArrayList<Long> count_r = new ArrayList<>();
Expand All @@ -530,41 +521,170 @@ private void extractAggregateData(ResultSet rs, SignalInfo info, boolean isRW, b
ArrayList<Number> max_w = new ArrayList<>();
ArrayList<Double> stddev_w = new ArrayList<>();
dTime = timeValue(rs.getTimestamp(1));
count_rows = rs.getLong(2);
count_errors = rs.getLong(3);
if (isArray) {
convertLongArray(count_r, rs.getArray(4));
convertDoubleArray(mean_r, rs.getArray(6 - floatingOffset1));
convertNumberArray(min_r, rs.getArray(7 - floatingOffset1), info.dataType);
convertNumberArray(max_r, rs.getArray(8 - floatingOffset1), info.dataType);
convertDoubleArray(stddev_r, rs.getArray(9 - floatingOffset1));
convertLongArray(count_w, rs.getArray(10 - floatingOffset1));
convertDoubleArray(mean_w, rs.getArray(12 - floatingOffset2));
convertNumberArray(min_w, rs.getArray(13 - floatingOffset2), info.dataType);
convertNumberArray(max_w, rs.getArray(14 - floatingOffset2), info.dataType);
convertDoubleArray(stddev_w, rs.getArray(15 - floatingOffset2));
if(isFloating)
{
convertLongArray(count_nan_r, rs.getArray(5));
convertLongArray(count_nan_w, rs.getArray(11));
}
}
else

for(Map.Entry<HdbData.Aggregate, Integer> agg_idx : indexes.entrySet())
{
count_r.add(rs.getLong(4));
mean_r.add(rs.getDouble(6 - floatingOffset1));
min_r.add(extractNumber(rs, 7 - floatingOffset1, info.dataType));
max_r.add(extractNumber(rs, 8 - floatingOffset1, info.dataType));
stddev_r.add(rs.getDouble(9 - floatingOffset1));
count_w.add(rs.getLong(10 - floatingOffset1));
mean_w.add(rs.getDouble(12 - floatingOffset2));
min_w.add(extractNumber(rs, 13-floatingOffset2, info.dataType));
max_w.add(extractNumber(rs, 14-floatingOffset2, info.dataType));
stddev_w.add(rs.getDouble(15 - floatingOffset2));
if(isFloating) {
count_nan_r.add(rs.getLong(5));
count_nan_w.add(rs.getLong(11));
}
HdbData.Aggregate agg = agg_idx.getKey();
// We had 2 because indexing start at 1 for sql result
// and the first one is always the timestamp.
int idx = agg_idx.getValue() + 2;
switch(agg)
{
case ROWS_COUNT:
{
count_rows = rs.getLong(idx);
break;
}
case ERRORS_COUNT:
{
count_errors = rs.getLong(idx);
break;
}
case COUNT_R:
{
if(isArray)
{
convertLongArray(count_r, rs.getArray(idx));
}
else
{
count_r.add(rs.getLong(idx));
}
break;
}
case NAN_COUNT_R:
{
if(isArray)
{
convertLongArray(count_r, rs.getArray(idx));
}
else
{
count_r.add(rs.getLong(idx));
}
break;
}
case MEAN_R:
{
if(isArray)
{
convertDoubleArray(mean_r, rs.getArray(idx));
}
else
{
mean_r.add(rs.getDouble(idx));
}
break;
}
case MIN_R:
{
if(isArray)
{
convertNumberArray(min_r, rs.getArray(idx), info.dataType);
}
else
{
min_r.add(extractNumber(rs, idx, info.dataType));
}
break;
}
case MAX_R:
{
if(isArray)
{
convertNumberArray(max_r, rs.getArray(idx), info.dataType);
}
else
{
max_r.add(extractNumber(rs, idx, info.dataType));
}
break;
}
case STDDEV_R:
{
if(isArray)
{
convertDoubleArray(stddev_r, rs.getArray(idx));
}
else
{
stddev_r.add(rs.getDouble(idx));
}
break;
}
case COUNT_W:
{
if(isArray)
{
convertLongArray(count_w, rs.getArray(idx));
}
else
{
count_w.add(rs.getLong(idx));
}
break;
}
case NAN_COUNT_W:
{
if(isArray)
{
convertLongArray(count_w, rs.getArray(idx));
}
else
{
count_w.add(rs.getLong(idx));
}
break;
}
case MEAN_W:
{
if(isArray)
{
convertDoubleArray(mean_w, rs.getArray(idx));
}
else
{
mean_w.add(rs.getDouble(idx));
}
break;
}
case MIN_W:
{
if(isArray)
{
convertNumberArray(min_w, rs.getArray(idx), info.dataType);
}
else
{
min_w.add(extractNumber(rs, idx, info.dataType));
}
break;
}
case MAX_W:
{
if(isArray)
{
convertNumberArray(max_w, rs.getArray(idx), info.dataType);
}
else
{
max_w.add(extractNumber(rs, idx, info.dataType));
}
break;
}
case STDDEV_W:
{
if(isArray)
{
convertDoubleArray(stddev_w, rs.getArray(idx));
}
else
{
stddev_w.add(rs.getDouble(idx));
}
break;
}
}
}

HdbData hd = HdbData.createData(info);
Expand Down
33 changes: 32 additions & 1 deletion src/main/java/org/tango/jhdb/SignalInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import org.tango.jhdb.data.HdbData;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;

/**
* Signal info structure
Expand Down Expand Up @@ -340,8 +342,37 @@ public boolean isState() {
* Returns true if this signal is aggregated data, false if it is raw.
*/
public boolean isAggregate() {
return Interval.isAggregate(interval);
return Interval.isAggregate(interval);
}

/**
* Returns true if this signal is aggregated data, false if it is raw.
*/
public String getAggregateQueryList(Map<HdbData.Aggregate, Integer> indexes) {
int idx = 0;
int nbAggregates = aggregates.size();
StringBuffer queryList = new StringBuffer();

if(indexes == null)
{
indexes = new HashMap<>();
}

for(HdbData.Aggregate agg : aggregates)
{
queryList.append(agg.toString());

if(idx != nbAggregates - 1)
{
queryList.append(", ");
}

indexes.put(agg, idx);
++idx;
}

return queryList.toString();
}

public String toString() {
return "Id=" + sigId + ", Type=" + dataType.toString() + ", Format=" + format.toString() + ", Access=" + access.toString()+ ", Interval=" + interval.toString();
Expand Down
Loading