From 01d5e0dc431e079117b8bb5c7073a2c8436af12d Mon Sep 17 00:00:00 2001 From: Damien Lacoste Date: Tue, 15 Feb 2022 17:47:34 +0100 Subject: [PATCH] Add support for mysql light schema. Detection based on the engine used on att_conf table. Support only json arrays --- src/main/java/org/tango/jhdb/Hdb.java | 5 +- .../java/org/tango/jhdb/MySQLLightSchema.java | 388 ++++++++++++++++++ src/main/java/org/tango/jhdb/MySQLSchema.java | 127 +++++- 3 files changed, 507 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/tango/jhdb/MySQLLightSchema.java diff --git a/src/main/java/org/tango/jhdb/Hdb.java b/src/main/java/org/tango/jhdb/Hdb.java index f98f426..908a051 100644 --- a/src/main/java/org/tango/jhdb/Hdb.java +++ b/src/main/java/org/tango/jhdb/Hdb.java @@ -132,15 +132,14 @@ public String getDBTypeName() { */ public void connectMySQL(String host,String db,String user,String passwd,short port) throws HdbFailed { hdbType = HDB_MYSQL; - schema = new MySQLSchema(host,db,user,passwd,port); + schema = MySQLSchema.createSchema(host,db,user,passwd,port); } /** * Connects to a MySQL HDB. */ public void connectMySQL() throws HdbFailed { - hdbType = HDB_MYSQL; - schema = new MySQLSchema(null,null,null,null,(short)0); + connectMySQL(null, null, null, null, (short)0); } /** diff --git a/src/main/java/org/tango/jhdb/MySQLLightSchema.java b/src/main/java/org/tango/jhdb/MySQLLightSchema.java new file mode 100644 index 0000000..2e08a12 --- /dev/null +++ b/src/main/java/org/tango/jhdb/MySQLLightSchema.java @@ -0,0 +1,388 @@ +//+====================================================================== +// $Source: $ +// +// Project: Tango +// +// Description: java source code for HDB extraction library. +// +// $Author: pons $ +// +// Copyright (C) : 2015 +// European Synchrotron Radiation Facility +// BP 220, Grenoble 38043 +// FRANCE +// +// This file is part of Tango. +// +// Tango is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Tango is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Tango. If not, see . +// +// $Revision $ +// +//-====================================================================== +package org.tango.jhdb; + +import org.tango.jhdb.data.HdbData; +import org.tango.jhdb.data.HdbDataSet; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * MySQL database access + */ +public class MySQLLightSchema extends MySQLSchema { + + public MySQLLightSchema(String url, Connection c) throws HdbFailed + { + super(url, c); + } + + private List parseJson(String json) + { + ArrayList ret = new ArrayList<>(); + // This is not ideal but does not require extra dependencies to do the json parsing. + int start = json.indexOf('[') + 1; + int end = json.lastIndexOf(']'); + String[] array = json.substring(start, end).split(","); + for(String val : array) + { + ret.add(val.trim()); + } + return ret; + } + + @Override + public HdbSigParam getLastParam(SignalInfo sigInfo) throws HdbFailed { + + String query = "SELECT recv_time,label,unit,standard_unit,display_unit,format,"+ + "archive_rel_change,archive_abs_change,archive_period,description" + + " FROM att_parameter " + + " WHERE att_conf_id='" + sigInfo.sigId + "'" + + " ORDER BY recv_time DESC limit 1"; + + HdbSigParam ret = new HdbSigParam(sigInfo); + + try { + + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); + ResultSet rs = statement.executeQuery(query); + if(rs.next()) { + + ret.recvTime = timeValue(rs.getTimestamp(1)); + ret.insertTime = 0; + ret.label = rs.getString(2); + ret.unit = rs.getString(3); + try { + ret.standard_unit = Double.parseDouble(rs.getString(4)); + } catch (NumberFormatException e) { + ret.standard_unit = 1.0; + } + try { + ret.display_unit = Double.parseDouble( rs.getString(5)); + } catch (NumberFormatException e) { + ret.display_unit = 1.0; + } + ret.format = rs.getString(6); + ret.archive_rel_change = rs.getString(7); + ret.archive_abs_change = rs.getString(8); + ret.archive_period = rs.getString(9); + ret.description = rs.getString(10); + + } else { + throw new HdbFailed("Cannot get parameter for " + sigInfo.name); + } + + statement.close(); + + } catch (SQLException e) { + throw new HdbFailed("Failed to get parameter history: "+e.getMessage()); + } + + return ret; + + } + + @Override + public ArrayList getParams(SignalInfo sigInfo, + String start_date, + String stop_date) throws HdbFailed { + + checkDates(start_date,stop_date); + + String query = "SELECT recv_time,label,unit,standard_unit,display_unit,format,"+ + "archive_rel_change,archive_abs_change,archive_period,description" + + " FROM att_parameter " + + " WHERE att_conf_id='" + sigInfo.sigId + "'" + + " AND recv_time>='" + toDBDate(start_date) + "'" + + " AND recv_time<='" + toDBDate(stop_date) + "'" + + " ORDER BY recv_time ASC"; + + ArrayList ret = new ArrayList(); + + try { + + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); + ResultSet rs = statement.executeQuery(query); + while(rs.next()) { + + HdbSigParam hd = new HdbSigParam(sigInfo); + hd.recvTime = timeValue(rs.getTimestamp(1)); + hd.insertTime = 0; + hd.label = rs.getString(2); + hd.unit = rs.getString(3); + try { + hd.standard_unit = Double.parseDouble(rs.getString(4)); + } catch (NumberFormatException e) { + hd.standard_unit = 1.0; + } + try { + hd.display_unit = Double.parseDouble( rs.getString(5)); + } catch (NumberFormatException e) { + hd.display_unit = 1.0; + } + hd.format = rs.getString(6); + hd.archive_rel_change = rs.getString(7); + hd.archive_abs_change = rs.getString(8); + hd.archive_period = rs.getString(9); + hd.description = rs.getString(10); + + ret.add(hd); + + } + + statement.close(); + + } catch (SQLException e) { + throw new HdbFailed("Failed to get parameter history: "+e.getMessage()); + } + + return ret; + } + + // --------------------------------------------------------------------------------------- + @Override + protected HdbDataSet getArrayData(SignalInfo info, + String sigId, + String start_date, + String stop_date) throws HdbFailed { + + boolean isRW = info.isRW(); + + String query; + int queryCount=0; + String tablename = info.tableName; + if (hasProgressListener()) { + + // Get a count of the request + query = "SELECT count(*) FROM " + tablename + + " WHERE att_conf_id='" + sigId + "'" + + " AND data_time>='" + toDBDate(start_date) + "'" + + " AND data_time<='" + toDBDate(stop_date) + "'"; + + try { + + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + ResultSet rs = statement.executeQuery(query); + rs.next(); + queryCount = rs.getInt(1); + statement.close(); + + } catch (SQLException e) { + throw new HdbFailed("Failed to get data: " + e.getMessage()); + } + + } + + // Fetch data + + String rwField = isRW?",value_w":""; + query = "SELECT data_time,recv_time,att_error_desc.error_desc as error_desc,quality,value_r"+rwField+ + " FROM " + tablename + + " left outer join att_error_desc on "+ tablename+".att_error_desc_id = att_error_desc.att_error_desc_id" + + " WHERE att_conf_id='" + sigId + "'" + + " AND data_time>='" + toDBDate(start_date) + "'" + + " AND data_time<='" + toDBDate(stop_date) + "'" + + " ORDER BY data_time ASC"; + + ArrayList ret = new ArrayList(); + ArrayList value = new ArrayList(); + ArrayList wvalue = null; + if(isRW) wvalue = new ArrayList(); + + try { + + long dTime = 0; + long newTime = 0; + long recvTime = 0; + long insertTime = 0; + String errorMsg = null; + int quality = 0; + int nbRow = 0; + boolean newItem = false; + + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(arrayFetchSize); + ResultSet rs = statement.executeQuery(query); + while(rs.next()) { + + HdbData hd = HdbData.createData(info); + value.clear(); + value.addAll(parseJson(rs.getString(5))); + if(isRW) { + wvalue.clear(); + wvalue.addAll(parseJson(rs.getString(6))); + } + + hd.parse( + timeValue(rs.getTimestamp(1)), //Tango timestamp + 0, //Event recieve timestamp + timeValue(rs.getTimestamp(2)), //Recording timestamp + rs.getString(3), // Error string + rs.getInt(4), // Quality value + value, // Read value + wvalue // Write value + ); + + ret.add(hd); + + if(hasProgressListener() && (nbRow% PROGRESS_NBROW ==0)) + fireProgressListener((double)nbRow/(double)queryCount); + + nbRow++; + + } + + if( newItem ) { + + // Store last item + HdbData hd = HdbData.createData(info); + hd.parse( + dTime, // Tango timestamp + recvTime, // Event receive timestamp + insertTime,// Recording timestamp + errorMsg, // Error string + quality, // Quality value + value, // Read value + wvalue // Write value + ); + ret.add(hd); + + } + + statement.close(); + + } catch (SQLException e) { + throw new HdbFailed("Failed to get data: "+e.getMessage()); + } + + return new HdbDataSet(ret); + + } + + + // --------------------------------------------------------------------------------------- + @Override + protected HdbDataSet getScalarData(SignalInfo info, + String sigId, + String start_date, + String stop_date) throws HdbFailed { + + String query; + int queryCount=0; + String tablename = info.tableName; + if (hasProgressListener()) { + + // Get a count of the request + query = "SELECT count(*) FROM " + tablename + + " WHERE att_conf_id='" + sigId + "'" + + " AND data_time>='" + toDBDate(start_date) + "'" + + " AND data_time<='" + toDBDate(stop_date) + "'"; + + try { + + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ResultSet rs = statement.executeQuery(query); + rs.next(); + queryCount = rs.getInt(1); + statement.close(); + + } catch (SQLException e) { + throw new HdbFailed("Failed to get data: " + e.getMessage()); + } + + } + + boolean isRW = info.isRW(); + String rwField = isRW?",value_w":""; + query = "SELECT data_time,recv_time, att_error_desc.error_desc as error_desc,quality,value_r"+rwField+ + " FROM " + tablename + + " left outer join att_error_desc on "+ tablename+".att_error_desc_id = att_error_desc.att_error_desc_id" + + " WHERE att_conf_id='" + sigId + "'" + + " AND data_time>'" + toDBDate(start_date) + "'" + + " AND data_time<'" + toDBDate(stop_date) + "'" + + " ORDER BY data_time ASC"; + + ArrayList ret = new ArrayList(); + ArrayList value = new ArrayList(); + ArrayList wvalue = null; + if(isRW) wvalue = new ArrayList(); + int nbRow=0; + + try { + + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(fetchSize); + ResultSet rs = statement.executeQuery(query); + while(rs.next()) { + + HdbData hd = HdbData.createData(info); + value.clear(); + value.add(rs.getString(5)); + if(isRW) { + wvalue.clear(); + wvalue.add(rs.getString(6)); + } + + hd.parse( + timeValue(rs.getTimestamp(1)), //Tango timestamp + 0, //Event recieve timestamp + timeValue(rs.getTimestamp(2)), //Recording timestamp + rs.getString(3), // Error string + rs.getInt(4), // Quality value + value, // Read value + wvalue // Write value + ); + + ret.add(hd); + + if(hasProgressListener() && (nbRow% PROGRESS_NBROW==0)) + fireProgressListener((double)nbRow/(double)queryCount); + + nbRow++; + + } + + statement.close(); + + } catch (SQLException e) { + throw new HdbFailed("Failed to get data: "+e.getMessage()); + } + + return new HdbDataSet(ret); + + } +} diff --git a/src/main/java/org/tango/jhdb/MySQLSchema.java b/src/main/java/org/tango/jhdb/MySQLSchema.java index e5d1d14..8c7d7c4 100644 --- a/src/main/java/org/tango/jhdb/MySQLSchema.java +++ b/src/main/java/org/tango/jhdb/MySQLSchema.java @@ -47,7 +47,7 @@ public class MySQLSchema extends HdbReader { public static final String DEFAULT_DB_URL_PREFIX = "jdbc:mysql://"; public static final int DEFAULT_DB_PORT = 3306; - private final static String[] tableNames = { + protected final static String[] tableNames = { "", "att_scalar_devdouble_ro", @@ -111,10 +111,116 @@ public class MySQLSchema extends HdbReader { }; // Notify every PROGRESS_NBROW rows - private final static int PROGRESS_NBROW =10000; - private Connection connection; - private AttributeBrowser browser=null; - private String dbURL; + protected final static int PROGRESS_NBROW =10000; + protected Connection connection; + protected AttributeBrowser browser=null; + protected String dbURL; + + private static boolean checkLightSchema(Connection c) throws HdbFailed + { + ArrayList list = new ArrayList<>(); + + String query = "SELECT ENGINE FROM information_schema.TABLES WHERE TABLE_NAME = 'att_conf';"; + + try { + Statement statement = c.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); + ResultSet resultSet = statement.executeQuery(query); + while (resultSet.next()) + list.add(resultSet.getString(1)); + statement.close(); + } catch (SQLException e) { + throw new HdbFailed("Failed to retrieve engine mode: "+e.getMessage()); + } + + return list.get(0).equalsIgnoreCase("InnoDB"); + } + + public static MySQLSchema createSchema(String host, String db, String user, String passwd, short port) throws HdbFailed + { + if(host==null || host.isEmpty()) { + host = System.getenv("HDB_MYSQL_HOST"); + if (host==null || host.isEmpty()) { + host = System.getProperty("HDB_MYSQL_HOST"); + if (host==null || host.isEmpty()) + throw new HdbFailed("host input parameter cannot be null if HDB_MYSQL_HOST variable is not defined"); + } + } + + if(user==null || user.isEmpty()) { + user = System.getenv("HDB_USER"); + if (user==null || user.isEmpty()) { + user = System.getProperty("HDB_USER"); + if (user==null || user.isEmpty()) + user = DEFAULT_DB_USER; + } + } + + if(passwd==null || passwd.isEmpty()) { + passwd = System.getenv("HDB_PASSWORD"); + if (passwd==null || passwd.isEmpty()) { + passwd = System.getProperty("HDB_PASSWORD"); + if (passwd==null || passwd.isEmpty()) + passwd = DEFAULT_DB_PASSWORD; + } + } + + if(db==null || db.isEmpty()) { + db = System.getenv("HDB_NAME"); + if (db==null || db.isEmpty()) { + db = System.getProperty("HDB_NAME"); + if (db==null || db.isEmpty()) + db = DEFAULT_DB_NAME; + } + } + + if(port==0) { + String pStr = System.getenv("HDB_MYSQL_PORT"); + if(pStr==null || passwd.isEmpty()) + port = DEFAULT_DB_PORT; + else { + try { + port = (short)Integer.parseInt(pStr); + } catch (NumberFormatException e) { + throw new HdbFailed("Invalid HDB_MYSQL_PORT variable " + e.getMessage()); + } + } + } + + try { + + Properties connectProperties = new Properties(); + connectProperties.setProperty("user", user); + connectProperties.setProperty("password", passwd); + connectProperties.setProperty("loginTimeout", Integer.toString(10)); + connectProperties.setProperty("tcpKeepAlive ", "true"); //Enable TCP keep-alive probe + + // URL example: jdbc:postgresql://host:port/database + String dbURL = DEFAULT_DB_URL_PREFIX + host + ":" + + Integer.toString(port) + "/" + db; + + Connection connection = DriverManager.getConnection(dbURL, connectProperties); + + boolean isLightSchema = checkLightSchema(connection); + if(isLightSchema) + { + return new MySQLLightSchema(dbURL, connection); + } + else + { + return new MySQLSchema(dbURL, connection); + } + + } catch (SQLException e) { + throw new HdbFailed("Failed to connect to MySQL: "+e.getMessage()); + } + } + + + public MySQLSchema(String url, Connection c) throws HdbFailed + { + connection = c; + dbURL = url; + } /** * Connects to a MySQL HDB. @@ -126,7 +232,8 @@ public class MySQLSchema extends HdbReader { * @throws HdbFailed in case of failure */ - public MySQLSchema(String host,String db,String user,String passwd,short port) throws HdbFailed { + public MySQLSchema(String host,String db,String user,String passwd,short port) throws HdbFailed + { if(host==null || host.isEmpty()) { host = System.getenv("HDB_MYSQL_HOST"); @@ -434,7 +541,7 @@ public HdbDataSet findErrors(String attName, // --------------------------------------------------------------------------------------- - private HdbDataSet getArrayData(SignalInfo info, + protected HdbDataSet getArrayData(SignalInfo info, String sigId, String start_date, String stop_date) throws HdbFailed { @@ -575,7 +682,7 @@ private HdbDataSet getArrayData(SignalInfo info, // --------------------------------------------------------------------------------------- - private HdbDataSet getScalarData(SignalInfo info, + protected HdbDataSet getScalarData(SignalInfo info, String sigId, String start_date, String stop_date) throws HdbFailed { @@ -666,7 +773,7 @@ private HdbDataSet getScalarData(SignalInfo info, } - private long timeValue(Timestamp ts) { + protected long timeValue(Timestamp ts) { long ret = ts.getTime(); ret = ret / 1000; @@ -678,7 +785,7 @@ private long timeValue(Timestamp ts) { } - private String toDBDate(String date) { + protected String toDBDate(String date) { // In: 09/07/2015 12:00:00 // Out: 2015-07-09 12:00:00