Skip to content
This repository was archived by the owner on May 28, 2021. It is now read-only.

repair json decode problem #303

Open
wants to merge 62 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
d244fa7
repair update mysqlclusters version field error
Jan 17, 2020
d5ea26a
repair json decode problem
Jan 19, 2020
68aa589
debug
Jan 19, 2020
eb8fbbd
increase timeout
Jan 19, 2020
f00d068
Merge branch 'mysql-operator-jsonrepair' of https://github.com/jiachu…
Feb 14, 2020
04c63a0
update clusterspec
Feb 14, 2020
dc1ed4f
update statefulsets resource
Feb 14, 2020
b2d68f6
add debug info
Feb 14, 2020
4cb581f
add debug info
Feb 14, 2020
9f9821c
update getclusterstatus
Feb 17, 2020
46d39f2
update instance.go
Feb 17, 2020
59c524e
debug statefulsets
Feb 17, 2020
1cfd5d2
debug os env
Feb 17, 2020
a618fab
debug mysql-agent container
Feb 17, 2020
3d732fa
update statefulset.go
Feb 17, 2020
04e2242
update mysql-agent debug level
Feb 17, 2020
a3627bf
update image pull policy
Feb 17, 2020
d6130ce
update port
Feb 17, 2020
fe4a1ad
update statefulsets
Feb 17, 2020
89c4c77
update statefulsets
Feb 17, 2020
4628748
update statefulsets
Feb 17, 2020
c7f55d4
update statefulsets
Feb 17, 2020
3243d65
update statefulsets
Feb 17, 2020
d62e4fd
update statefulsets
Feb 17, 2020
fc23319
update statefulsets
Feb 17, 2020
c4040d5
update log-bin basename
Feb 18, 2020
82825ce
update agent listen port
Feb 18, 2020
a2a05dd
debug change port
Feb 18, 2020
56b10d3
debug string to int
Feb 18, 2020
0e21571
debug string to int
Feb 18, 2020
59e07fe
debug string to int
Feb 18, 2020
bcd9895
debug string to int
Feb 18, 2020
7a1e918
debug cluster struct
Feb 18, 2020
ffb227d
debug liveprobe readiness
Feb 18, 2020
b468866
debug liveprobe readiness
Feb 18, 2020
f6e85d2
debug liveprobe readiness
Feb 18, 2020
cc7c239
debug liveprobe readiness
Feb 18, 2020
62eaa20
debug liveprobe readiness
Feb 18, 2020
d8df60e
debug database not yet
Feb 18, 2020
07549d9
debug database not yet
Feb 18, 2020
0bd80a8
debug database not yet
Feb 18, 2020
184b5b1
debug statefulset port
Feb 18, 2020
7ec556e
debug statefulset port
Feb 18, 2020
77a6e90
debug statefulset port
Feb 18, 2020
6dc2503
debug statefulset port
Feb 18, 2020
3fb9337
debug statefulset port
Feb 18, 2020
d971254
update mysqlsh group port default
Feb 20, 2020
8ecea26
debug cluster_manager
Feb 20, 2020
7736b2d
debug headless service port
Feb 21, 2020
4c282a4
debug headless service port
Feb 21, 2020
c48e20f
debug headless service port
Feb 21, 2020
4102c64
debug headless service port
Feb 21, 2020
c9ce752
debug
Feb 27, 2020
c0407e3
change agent run interval
Feb 27, 2020
1abb5e9
change agent run interval
Feb 27, 2020
cbd24a5
change agent run interval
Feb 27, 2020
df736a9
change agent run interval
Feb 27, 2020
a03ee93
change agent run interval
Feb 27, 2020
3c06957
change agent run interval
Feb 27, 2020
1cb709a
change agent run interval
Feb 27, 2020
66a19c6
change agent run interval
Feb 27, 2020
99fd90c
drop exit-state-action
Mar 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/mysql-agent/app/mysql_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strconv"
"sync"
"time"
"os"
"fmt"

"github.com/golang/glog"
"github.com/heptiolabs/healthcheck"
Expand Down Expand Up @@ -100,15 +102,17 @@ func Run(opts *agentopts.MySQLAgentOpts) error {
if err != nil {
return errors.Wrap(err, "failed to create new local MySQL InnoDB cluster manager")
}

// Initialise the agent metrics.
// agent prometheus port
agentPromePort := os.Getenv("AGENT_PROME_PORT")
promeMetricsEndpoint := fmt.Sprintf("0.0.0.0: %s", agentPromePort)
glog.Info("agent prometheus endpoint: ", promeMetricsEndpoint)
metrics.RegisterPodName(opts.Hostname)
metrics.RegisterClusterName(manager.Instance.ClusterName)
clustermgr.RegisterMetrics()
backupcontroller.RegisterMetrics()
restorecontroller.RegisterMetrics()
http.Handle("/metrics", prometheus.Handler())
go http.ListenAndServe(metricsEndpoint, nil)
go http.ListenAndServe(promeMetricsEndpoint, nil)

// Block until local instance successfully initialised.
for !manager.Sync(ctx) {
Expand Down
23 changes: 23 additions & 0 deletions pkg/apis/mysql/v1alpha1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ import (
const (
// DefaultVersion is the MySQL version to use if not specified explicitly by user
DefaultVersion = "8.0.12"
DefaultReplicationGroupPort = 33061
DefaultAgentHealthCheckPort = 10512
DefaultAgentPromePort = 8080
DefaultMysqlPort = 3306
defaultMembers = 3
defaultBaseServerID = 1000
defaultAgentIntervalTime = 15
// maxBaseServerID is the maximum safe value for BaseServerID calculated
// as max MySQL server_id value - max Replication Group size.
maxBaseServerID uint32 = 4294967295 - 9
Expand Down Expand Up @@ -69,6 +74,24 @@ func (c *Cluster) EnsureDefaults() *Cluster {
c.Spec.Version = DefaultVersion
}

if c.Spec.GroupPort == 0 {
c.Spec.GroupPort = DefaultReplicationGroupPort
}

if c.Spec.AgentCheckPort == 0 {
c.Spec.AgentCheckPort = DefaultAgentHealthCheckPort
}

if c.Spec.AgentPromePort == 0 {
c.Spec.AgentPromePort = DefaultAgentPromePort
}

if c.Spec.MysqlPort == 0 {
c.Spec.MysqlPort = DefaultMysqlPort
}
if c.Spec.AgentIntervalTime == 0 {
c.Spec.AgentIntervalTime = defaultAgentIntervalTime
}
return c
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/mysql/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ type ClusterSpec struct {
// all instances will be R/W. If false (the default), only a single instance
// will be R/W and the rest will be R/O.
MultiMaster bool `json:"multiMaster,omitempty"`
//hostnetwork
HostNetwork bool `json:"hostNetwork,omitempty"`
//replicationGroupPort
GroupPort uint32 `json:"groupPort,omitempty"`
//agent healthcheck port
AgentCheckPort uint32 `json:"agentCheckPort,omitempty"`
//agent prometheus port
AgentPromePort uint32 `json:"agentPromePort,omitempty"`
//mysql_port
MysqlPort uint32 `json:"mysqlPort,omitempty"`
//agent execute interval
AgentIntervalTime uint32 `json:"agentIntervalTime,omitempty"`
// NodeSelector is a selector which must be true for the pod to fit on a node.
// Selector which must match a node's labels for the pod to be scheduled on that node.
// More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/innodb/innodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package innodb
import (
"fmt"
"net"
"os"
)

// DefaultClusterName is the default name assigned to InnoDB clusters created by
Expand Down Expand Up @@ -123,7 +124,7 @@ func (s *ClusterStatus) GetInstanceStatus(name string) InstanceStatus {
if s.DefaultReplicaSet.Topology == nil {
return InstanceStatusNotFound
}
if is, ok := s.DefaultReplicaSet.Topology[fmt.Sprintf("%s:%d", name, MySQLDBPort)]; ok {
if is, ok := s.DefaultReplicaSet.Topology[fmt.Sprintf("%s:%s", name, os.Getenv("MYSQL_PORT"))]; ok {
return is.Status
}
return InstanceStatusNotFound
Expand Down
16 changes: 9 additions & 7 deletions pkg/cluster/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/pkg/errors"

"github.com/oracle/mysql-operator/pkg/cluster/innodb"
//"github.com/oracle/mysql-operator/pkg/cluster/innodb"
)

// Instance represents the local MySQL instance.
Expand Down Expand Up @@ -62,18 +62,19 @@ func NewInstance(namespace, clusterName, parentName string, ordinal, port int, m
// NewLocalInstance creates a new instance of this structure, with it's name and index
// populated from os.Hostname().
func NewLocalInstance() (*Instance, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
pod_name := os.Getenv("MY_POD_NAME")
if pod_name == "" {
return nil, errors.Errorf("env MY_POD_NAME is empty!!!")
}
name, ordinal := GetParentNameAndOrdinal(hostname)
name, ordinal := GetParentNameAndOrdinal(pod_name)
multiMaster, _ := strconv.ParseBool(os.Getenv("MYSQL_CLUSTER_MULTI_MASTER"))
mysqlPort, _ := strconv.ParseInt(os.Getenv("MYSQL_PORT"), 10, 32)
return &Instance{
Namespace: os.Getenv("POD_NAMESPACE"),
ClusterName: os.Getenv("MYSQL_CLUSTER_NAME"),
ParentName: name,
Ordinal: ordinal,
Port: innodb.MySQLDBPort,
Port: int(mysqlPort),
MultiMaster: multiMaster,
IP: net.ParseIP(os.Getenv("MY_POD_IP")),
}, nil
Expand All @@ -90,12 +91,13 @@ func NewInstanceFromGroupSeed(seed string) (*Instance, error) {
// MySQLDB port not its group replication port.
parentName, ordinal := GetParentNameAndOrdinal(podName)
multiMaster, _ := strconv.ParseBool(os.Getenv("MYSQL_CLUSTER_MULTI_MASTER"))
mysqlPort, _ := strconv.ParseInt(os.Getenv("MYSQL_PORT"), 10, 32)
return &Instance{
ClusterName: os.Getenv("MYSQL_CLUSTER_NAME"),
Namespace: os.Getenv("POD_NAMESPACE"),
ParentName: parentName,
Ordinal: ordinal,
Port: innodb.MySQLDBPort,
Port: int(mysqlPort),
MultiMaster: multiMaster,
}, nil
}
Expand Down
27 changes: 25 additions & 2 deletions pkg/controllers/cluster/manager/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package manager
import (
"context"
"fmt"
"os"
"strings"
"strconv"
"time"

"github.com/golang/glog"
Expand All @@ -36,7 +38,7 @@ import (
"github.com/oracle/mysql-operator/pkg/util/mysqlsh"
)

const pollingIntervalSeconds = 15
const pollingIntervalSeconds = 60

// ClusterManager manages the local MySQL instance's membership of an InnoDB cluster.
type ClusterManager struct {
Expand Down Expand Up @@ -100,6 +102,7 @@ func (m *ClusterManager) getClusterStatus(ctx context.Context) (*innodb.ClusterS
if localMSHErr != nil {
var err error
clusterStatus, err = getClusterStatusFromGroupSeeds(ctx, m.kubeClient, m.Instance)
glog.V(2).Infof("get cluster seeds*** error: %+v", err)
if err != nil {
// NOTE: We return the localMSHErr rather than the error here so that we
// can dispatch on it.
Expand All @@ -122,6 +125,7 @@ func (m *ClusterManager) Sync(ctx context.Context) bool {
clusterStatus, err := m.getClusterStatus(ctx)
if err != nil {
myshErr, ok := errors.Cause(err).(*mysqlsh.Error)
glog.V(2).Infof("get cluster*** error: %+v", err)
if !ok {
glog.Errorf("Failed to get the cluster status: %+v", err)
return false
Expand Down Expand Up @@ -289,10 +293,17 @@ func (m *ClusterManager) handleInstanceNotFound(ctx context.Context, primaryAddr
glog.Errorf("Getting CIDR to whitelist for GR: %v", err)
return false
}
//use deault mysqlPort + 1
//localAddress := fmt.Sprintf("%s:%s", m.Instance.Name(), os.Getenv("GROUP_PORT"))
//groupSeeds := os.Getenv("REPLICATION_GROUP_SEEDS")

//glog.Infof("localAddress: %s, groupSeeds: %s", localAddress, groupSeeds)

if err := psh.AddInstanceToCluster(ctx, m.Instance.GetShellURI(), mysqlsh.Options{
"memberSslMode": "REQUIRED",
"ipWhitelist": whitelistCIDR,
// "localAddress": localAddress,
// "groupSeeds": groupSeeds,
}); err != nil {
glog.Errorf("Failed to add to cluster: %v", err)
return false
Expand Down Expand Up @@ -322,9 +333,18 @@ func (m *ClusterManager) createCluster(ctx context.Context) (*innodb.ClusterStat
if err != nil {
return nil, errors.Wrap(err, "getting CIDR to whitelist for GR")
}

// use deault mysql_port + 1
//localAddress := fmt.Sprintf("%s:%s", m.Instance.Name(), os.Getenv("GROUP_PORT"))
//groupSeeds := os.Getenv("REPLICATION_GROUP_SEEDS")

//glog.Infof("localAddress: %s, groupSeeds: %s", localAddress, groupSeeds)

opts := mysqlsh.Options{
"memberSslMode": "REQUIRED",
"ipWhitelist": whitelistCIDR,
// "localAddress": localAddress,
// "groupSeeds": groupSeeds,
}
if m.Instance.MultiMaster {
opts["force"] = "True"
Expand Down Expand Up @@ -355,7 +375,10 @@ func (m *ClusterManager) rebootFromOutage(ctx context.Context) (*innodb.ClusterS
// Run runs the ClusterManager controller.
// NOTE: ctx is not currently used for cancellation by caller (the stopCh is).
func (m *ClusterManager) Run(ctx context.Context) {
wait.Until(func() { m.Sync(ctx) }, time.Second*pollingIntervalSeconds, ctx.Done())
interval_time, _ := strconv.ParseUint(os.Getenv("AGENT_INTERVAL"), 10, 32)
glog.Info("***agent run interval: ", interval_time)
//wait.Until(func() { m.Sync(ctx) }, time.Second*pollingIntervalSeconds, ctx.Done())
wait.Until(func() { m.Sync(ctx) }, time.Second*time.Duration(interval_time), ctx.Done())

<-ctx.Done()

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/cluster/manager/innodb_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

var errNoClusterFound = errors.New("no cluster found on any of the seed nodes")

const defaultTimeout = 10 * time.Second
const defaultTimeout = 60 * time.Second

// isDatabaseRunning returns true if a connection can be made to the MySQL
// database running in the pod instance in which this function is called.
Expand All @@ -46,6 +46,7 @@ func isDatabaseRunning(ctx context.Context) bool {
"mysqladmin",
"--protocol", "tcp",
"-u", "root",
os.ExpandEnv("-P$MYSQL_PORT"),
os.ExpandEnv("-p$MYSQL_ROOT_PASSWORD"),
"status",
).Run()
Expand Down
5 changes: 4 additions & 1 deletion pkg/options/agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os"
"time"
"strconv"

"github.com/golang/glog"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -65,8 +66,10 @@ func NewMySQLAgentOpts() *MySQLAgentOpts {
}
namespace := os.Getenv("POD_NAMESPACE")
clusterName := os.Getenv("MYSQL_CLUSTER_NAME")
healthcheckPort, _ := strconv.ParseInt(os.Getenv("AGENT_HEALTHCHECK_PORT"), 10, 32)
glog.V(2).Infof("mysql-agent healthcheckPort: %d", healthcheckPort)
return &MySQLAgentOpts{
HealthcheckPort: DefaultMySQLAgentHeathcheckPort,
HealthcheckPort: int32(healthcheckPort),
Address: "0.0.0.0",
Namespace: namespace,
ClusterName: clusterName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// NewForCluster will return a new headless Kubernetes service for a MySQL cluster
func NewForCluster(cluster *v1alpha1.Cluster) *corev1.Service {
mysqlPort := corev1.ServicePort{Port: 3306}
mysqlPort := corev1.ServicePort{Port: int32(cluster.Spec.MysqlPort)}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.ClusterLabel: cluster.Name},
Expand Down
Loading