Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions grafana/rmf-app/pkg/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ type Config struct {
// Custom RMF settings.
CacheSizeRaw string `json:"cacheSize"`
// Legacy custom RMF settings. We should ge rid of these at some point.
Server *string `json:"path"`
Port string `json:"port"`
SSL bool `json:"ssl"`
Username string `json:"userName"`
Password string `json:"password"`
SSLVerify bool `json:"skipVerify"` // NB: the meaning of JSON field is inverted.
Server *string `json:"path"`
Port string `json:"port"`
SSL bool `json:"ssl"`
Username string `json:"userName"`
Password string `json:"password"`
SSLVerify bool `json:"skipVerify"` // NB: the meaning of JSON field is inverted.
OmegamonDs string `json:"omegamonDs"`
}
}

Expand Down
68 changes: 65 additions & 3 deletions grafana/rmf-app/pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"net/http"
"runtime/debug"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -63,6 +64,7 @@ type RMFDatasource struct {
frameCache *cache.FrameCache
ddsClient *dds.Client
single singleflight.Group
omegamonDs string
}

// NewRMFDatasource creates a new instance of the RMF datasource.
Expand All @@ -79,6 +81,7 @@ func NewRMFDatasource(ctx context.Context, settings backend.DataSourceInstanceSe
config.JSON.TlsSkipVerify, config.JSON.DisableCompression)
ds.channelCache = cache.NewChannelCache(ChannelCacheSizeMB)
ds.frameCache = cache.NewFrameCache(config.CacheSize)
ds.omegamonDs = config.JSON.OmegamonDs
logger.Info("initialized a datasource",
"uid", settings.UID, "name", settings.Name,
"url", config.URL, "timeout", config.Timeout, "cacheSize", config.CacheSize,
Expand Down Expand Up @@ -148,13 +151,28 @@ func (ds *RMFDatasource) CallResource(ctx context.Context, req *backend.CallReso
switch req.Path {
// FIXME: it's a contained.xml request for M3 resource tree. Re-factor accordingly.
case "variablequery":
// Extract the query parameter from the POST request
jsonStr := string(req.Body)
varRequest := VariableQueryRequest{}
err := json.Unmarshal([]byte(jsonStr), &varRequest)
err := json.Unmarshal(req.Body, &varRequest)
if err != nil {
return log.ErrorWithId(logger, log.InternalError, "could not unmarshal data", "error", err)
}
q := varRequest.Query
q = strings.Trim(q, " ")
q = strings.ToLower(q)

switch q {
case "sysplex":
data, _ := ds.sysplexContainedJson()
return sender.Send(&backend.CallResourceResponse{Status: http.StatusOK, Body: data})
case "systems":
data, _ := ds.systemsContainedJson()
return sender.Send(&backend.CallResourceResponse{Status: http.StatusOK, Body: data})
case "omegamonds":
data, _ := ds.omegamonContainedJson()
return sender.Send(&backend.CallResourceResponse{Status: http.StatusOK, Body: data})
}

// Extract the query parameter from the POST request
ddsResource := varRequest.Query
if len(strings.TrimSpace(ddsResource)) == 0 {
return log.ErrorWithId(logger, log.InputError, "variable query cannot be blank")
Expand Down Expand Up @@ -184,6 +202,50 @@ func (ds *RMFDatasource) CallResource(ctx context.Context, req *backend.CallReso
}
}

func (ds *RMFDatasource) sysplexContainedJson() ([]byte, error) {
sysplex := ds.ddsClient.GetSysplex()
return toContainedJson([]string{sysplex})
}

func (ds *RMFDatasource) systemsContainedJson() ([]byte, error) {
systems := ds.ddsClient.GetSystems()
slices.Sort(systems)
return toContainedJson(systems)
}

func (ds *RMFDatasource) omegamonContainedJson() ([]byte, error) {
return toContainedJson([]string{ds.omegamonDs})
}

func toContainedJson(resources []string) ([]byte, error) {
type Resource struct {
Reslabel string `json:"reslabel"`
}
type Contained struct {
Resource []Resource `json:"resource"`
}
type ContainedResource struct {
Contained Contained `json:"contained"`
}
type Ddsml struct {
ContainedResourcesList []ContainedResource `json:"containedResourcesList"`
}

contained := Contained{Resource: []Resource{}}
for _, res := range resources {
contained.Resource = append(contained.Resource, Resource{Reslabel: "," + res + ","})
}

var containedResource = ContainedResource{
Contained: contained,
}

result := Ddsml{
ContainedResourcesList: []ContainedResource{containedResource},
}
return json.Marshal(result)
}

type RequestParams struct {
Resource struct {
Value string `json:"value"`
Expand Down
32 changes: 27 additions & 5 deletions grafana/rmf-app/pkg/plugin/dds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Client struct {
httpClient *http.Client
headerMap *HeaderMap
timeData *TimeData
resource *Resource
systems []string
useXmlExt atomic.Bool

stopChan chan struct{}
Expand Down Expand Up @@ -118,7 +120,7 @@ func (c *Client) sync() {
}

func (c *Client) updateCache() {
c.updateTimeData()
c.updateMetadata()
c.updateHeaders()
}

Expand Down Expand Up @@ -239,15 +241,15 @@ func (c *Client) ensureTimeData() *TimeData {
timeData := c.timeData
c.rwMutex.RUnlock()
if timeData == nil {
timeData = c.updateTimeData()
timeData = c.updateMetadata()
}
return timeData
}

func (c *Client) updateTimeData() *TimeData {
logger := log.Logger.With("func", "updateTimeData")
func (c *Client) updateMetadata() *TimeData {
logger := log.Logger.With("func", "updateMetadata")
result, _, _ := c.single.Do("timeData", func() (any, error) {
response, err := c.Get(PerformPath, "resource", ",,SYSPLEX", "id", "8D0D50")
response, err := c.Get(PerformPath, "resource", ",,SYSPLEX", "id", "8D0D60")
if err != nil {
logger.Error("unable to fetch DDS time data", "error", err)
return nil, err
Expand All @@ -257,8 +259,15 @@ func (c *Client) updateTimeData() *TimeData {
logger.Error("unable to fetch DDS time data", "error", "no time data in DDS response")
return nil, err
}
resource := response.Reports[0].Resource
if resource == nil {
logger.Error("unable to fetch DDS resource", "error", "no resource data in DDS response")
}
systems := response.Reports[0].GetRowNames()
c.rwMutex.Lock()
c.timeData = timeData
c.resource = resource
c.systems = systems
c.rwMutex.Unlock()
logger.Debug("DDS time data updated")
return timeData, nil
Expand All @@ -277,3 +286,16 @@ func (c *Client) GetCachedMintime() time.Duration {
}
return time.Duration(minTime) * time.Second
}

func (c *Client) GetSysplex() string {
c.ensureTimeData()
if c.resource != nil {
return c.resource.GetName()
}
return ""
}

func (c *Client) GetSystems() []string {
c.ensureTimeData()
return c.systems
}
25 changes: 25 additions & 0 deletions grafana/rmf-app/pkg/plugin/dds/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dds

import (
"fmt"
"strings"
"time"
)

Expand Down Expand Up @@ -55,12 +56,23 @@ type Report struct {
Metric *Metric
Message *Message
Caption Caption
Resource *Resource `json:"resource"`
Headers struct {
Cols []Col `json:"col"`
} `json:"columnHeaders"`
Rows []Row `json:"row"`
}

func (r Report) GetRowNames() []string {
var names []string
if r.Rows != nil {
for _, row := range r.Rows {
names = append(names, row.Cols[0])
}
}
return names
}

type TimeData struct {
// FIXME: don't use these in report headers: they are in DDS timezone. Remove from the mapping.
LocalStart DateTime `json:"localStart"`
Expand All @@ -81,6 +93,19 @@ type TimeData struct {
} `json:"dataRange"`
}

type Resource struct {
Reslabel string `json:"reslabel"`
Restype string `json:"restype"`
}

func (r Resource) GetName() string {
ss := strings.Split(r.Reslabel, ",")
if len(ss) > 1 {
return ss[1]
}
return r.Reslabel
}

type DateTime struct {
time.Time
}
Expand Down
28 changes: 17 additions & 11 deletions grafana/rmf-app/pkg/plugin/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,45 +50,51 @@ func NoDataFrame(t time.Time) *data.Frame {
)
}

func Build(ddsResponse *dds.Response, headers *dds.HeaderMap, wide bool) (*data.Frame, error) {
logger := log.Logger.With("func", "Build")
func validateResponse(ddsResponse *dds.Response) error {
logger := log.Logger.With("func", "validateResponse")

reportsNum := len(ddsResponse.Reports)
if reportsNum == 0 {
return nil, errors.New("no reports in DDS response")
return errors.New("no reports in DDS response")
} else if reportsNum > 1 {
return nil, fmt.Errorf("too many reports (%d) in DDS response", reportsNum)
return fmt.Errorf("too many reports (%d) in DDS response", reportsNum)
}
report := ddsResponse.Reports[0]
if message := report.Message; message != nil {
if _, ok := dds.AcceptableMessages[message.Id]; !ok {
return nil, message
return message
} else {
logger.Debug(message.Error())
}
}
if report.TimeData == nil {
return nil, errors.New("no time data in DDS response")
return errors.New("no time data in DDS response")
}
if report.Metric == nil {
return nil, errors.New("no metric data in DDS response")
return errors.New("no metric data in DDS response")
}
if _, ok := dds.SupportedFormats[report.Metric.Format]; !ok {
return nil, fmt.Errorf("unsupported data format (%s) in DDS response", report.Metric.Format)
return fmt.Errorf("unsupported data format (%s) in DDS response", report.Metric.Format)
}
return nil
}

func Build(ddsResponse *dds.Response, headers *dds.HeaderMap, wide bool) (*data.Frame, error) {
err := validateResponse(ddsResponse)
if err != nil {
return nil, err
}
report := ddsResponse.Reports[0]
format := report.Metric.Format
frameName := strings.Trim(report.Metric.Description, " ")
var newFrame *data.Frame

if format == dds.ReportFormat {
newFrame = buildForReport(&report, headers, frameName)
return buildForReport(&report, headers, frameName), nil
} else if wide {
return buildWideForMetric(&report, frameName), nil
} else {
return buildLongForMetric(&report, frameName), nil
}
return newFrame, nil
}

// buildWideForMetric creates a time series data frame for a metric from pre-parsed DDS response.
Expand Down
Loading
Loading