Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Read only option, Java 9+ ambitious method resolution issue with Properties and HashTable #642

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
, "jmxUser" -> optional(text)
, "jmxPass" -> optional(text)
, "jmxSsl" -> boolean
, "restrictOperations" -> boolean
, "pollConsumers" -> boolean
, "filterConsumers" -> boolean
, "logkafkaEnabled" -> boolean
Expand Down Expand Up @@ -137,6 +138,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
"jmxUser" -> optional(text),
"jmxPass" -> optional(text),
"jmxSsl" -> boolean,
"restrictOperations" -> boolean,
"pollConsumers" -> boolean,
"filterConsumers" -> boolean,
"logkafkaEnabled" -> boolean,
Expand Down Expand Up @@ -185,6 +187,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
,false
,false
,false
,false
,Option(defaultTuning)
,PLAINTEXT
,None
Expand Down Expand Up @@ -230,6 +233,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
cc.jmxUser,
cc.jmxPass,
cc.jmxSsl,
cc.restrictOperations,
cc.pollConsumers,
cc.filterConsumers,
cc.logkafkaEnabled,
Expand Down Expand Up @@ -258,6 +262,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
clusterConfig.jmxUser,
clusterConfig.jmxPass,
clusterConfig.jmxSsl,
clusterConfig.restrictOperations,
clusterConfig.pollConsumers,
clusterConfig.filterConsumers,
clusterConfig.tuning,
Expand Down Expand Up @@ -329,6 +334,7 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
clusterOperation.clusterConfig.jmxUser,
clusterOperation.clusterConfig.jmxPass,
clusterOperation.clusterConfig.jmxSsl,
clusterOperation.clusterConfig.restrictOperations,
clusterOperation.clusterConfig.pollConsumers,
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.tuning,
Expand Down
4 changes: 4 additions & 0 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser: Option[String],
jmxPass: Option[String],
jmxSsl: Boolean,
restrictOperations: Boolean,
pollConsumers: Boolean,
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
Expand All @@ -281,6 +282,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser = jmxUser,
jmxPass = jmxPass,
jmxSsl = jmxSsl,
restrictOperations = restrictOperations,
pollConsumers = pollConsumers,
filterConsumers = filterConsumers,
logkafkaEnabled = logkafkaEnabled,
Expand All @@ -298,6 +300,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser: Option[String],
jmxPass: Option[String],
jmxSsl: Boolean,
restrictOperations: Boolean,
pollConsumers: Boolean,
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
Expand All @@ -320,6 +323,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
jmxUser = jmxUser,
jmxPass = jmxPass,
jmxSsl = jmxSsl,
restrictOperations = restrictOperations,
pollConsumers = pollConsumers,
filterConsumers = filterConsumers,
logkafkaEnabled = logkafkaEnabled,
Expand Down
1 change: 1 addition & 0 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
&& newConfig.jmxPass == currentConfig.jmxPass
&& newConfig.jmxSsl == currentConfig.jmxSsl
&& newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled
&& newConfig.restrictOperations == currentConfig.restrictOperations
&& newConfig.pollConsumers == currentConfig.pollConsumers
&& newConfig.filterConsumers == currentConfig.filterConsumers
&& newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled
Expand Down
348 changes: 185 additions & 163 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions app/kafka/manager/features/KMFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sealed trait ClusterFeature extends KMFeature

case object KMLogKafkaFeature extends ClusterFeature
case object KMDeleteTopicFeature extends ClusterFeature
case object KMRestrictedFeature extends ClusterFeature
case object KMJMXMetricsFeature extends ClusterFeature
case object KMDisplaySizeFeature extends ClusterFeature
case object KMPollConsumersFeature extends ClusterFeature
Expand Down Expand Up @@ -73,6 +74,9 @@ object ClusterFeatures {
if(clusterConfig.pollConsumers)
buffer+=KMPollConsumersFeature

if(clusterConfig.restrictOperations)
buffer+=KMRestrictedFeature

ClusterFeatures(buffer.toSet)
}
}
10 changes: 8 additions & 2 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ object ClusterConfig {
, jmxUser: Option[String]
, jmxPass: Option[String]
, jmxSsl: Boolean
, restrictOperations:Boolean
, pollConsumers: Boolean
, filterConsumers: Boolean
, logkafkaEnabled: Boolean = false
Expand All @@ -210,6 +211,7 @@ object ClusterConfig {
, jmxUser
, jmxPass
, jmxSsl
, restrictOperations
, pollConsumers
, filterConsumers
, logkafkaEnabled
Expand All @@ -223,10 +225,10 @@ object ClusterConfig {
}

def customUnapply(cc: ClusterConfig) : Option[(
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Some((
cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers,
cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.restrictOperations, cc.pollConsumers, cc.filterConsumers,
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.map(_.stringId), cc.jaasConfig
)
)
Expand Down Expand Up @@ -264,6 +266,7 @@ object ClusterConfig {
:: ("jmxUser" -> toJSON(config.jmxUser))
:: ("jmxPass" -> toJSON(config.jmxPass))
:: ("jmxSsl" -> toJSON(config.jmxSsl))
:: ("restrictOperations" -> toJSON(config.restrictOperations))
:: ("pollConsumers" -> toJSON(config.pollConsumers))
:: ("filterConsumers" -> toJSON(config.filterConsumers))
:: ("logkafkaEnabled" -> toJSON(config.logkafkaEnabled))
Expand All @@ -290,6 +293,7 @@ object ClusterConfig {
val jmxUser = fieldExtended[Option[String]]("jmxUser")(json)
val jmxPass = fieldExtended[Option[String]]("jmxPass")(json)
val jmxSsl = fieldExtended[Boolean]("jmxSsl")(json)
val restrictOperations = fieldExtended[Boolean]("restrictOperations")(json)
val pollConsumers = fieldExtended[Boolean]("pollConsumers")(json)
val filterConsumers = fieldExtended[Boolean]("filterConsumers")(json)
val logkafkaEnabled = fieldExtended[Boolean]("logkafkaEnabled")(json)
Expand All @@ -310,6 +314,7 @@ object ClusterConfig {
jmxUser.getOrElse(None),
jmxPass.getOrElse(None),
jmxSsl.getOrElse(false),
restrictOperations.getOrElse(false),
pollConsumers.getOrElse(false),
filterConsumers.getOrElse(true),
logkafkaEnabled.getOrElse(false),
Expand Down Expand Up @@ -442,6 +447,7 @@ case class ClusterConfig (name: String
, jmxUser: Option[String]
, jmxPass: Option[String]
, jmxSsl: Boolean
, restrictOperations:Boolean
, pollConsumers: Boolean
, filterConsumers: Boolean
, logkafkaEnabled: Boolean
Expand Down
3 changes: 2 additions & 1 deletion app/kafka/manager/utils/logkafka81/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ object LogConfig extends LogkafkaNewConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}

Expand Down
3 changes: 2 additions & 1 deletion app/kafka/manager/utils/logkafka82/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ object LogConfig extends LogkafkaNewConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}

Expand Down
5 changes: 3 additions & 2 deletions app/kafka/manager/utils/zero10/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
defaults.forEach((k, v) => props.put(k, v))
overrides.forEach((k, v) => props.put(k, v))
LogConfig(props)
}

Expand Down
5 changes: 3 additions & 2 deletions app/kafka/manager/utils/zero11/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
defaults.forEach((k, v) => props.put(k, v))
overrides.forEach((k, v) => props.put(k, v))
LogConfig(props)
}

Expand Down
3 changes: 2 additions & 1 deletion app/kafka/manager/utils/zero81/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}

Expand Down
3 changes: 2 additions & 1 deletion app/kafka/manager/utils/zero82/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
overrides.forEach((k, v) => props.put(k, v))
fromProps(props)
}

Expand Down
5 changes: 3 additions & 2 deletions app/kafka/manager/utils/zero90/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ object LogConfig extends TopicConfigs {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
//to handle to Scala bug with Java 9+ versions https://github.com/scala/bug/issues/10418
defaults.forEach((k, v) => props.put(k, v))
overrides.forEach((k, v) => props.put(k, v))
LogConfig(props)
}

Expand Down
7 changes: 4 additions & 3 deletions app/models/form/ClusterOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object ClusterOperation {
, jmxUser: Option[String]
, jmxPass: Option[String]
, jmxSsl: Boolean
, restrictOperations: Boolean
, pollConsumers: Boolean
, filterConsumers: Boolean
, logkafkaEnabled: Boolean
Expand All @@ -50,14 +51,14 @@ object ClusterOperation {
, jaasConfig: Option[String]
): ClusterOperation = {
ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl,
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
restrictOperations,pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
}

def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl,
co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.restrictOperations,co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId,
co.clusterConfig.saslMechanism.map(_.stringId),
co.clusterConfig.jaasConfig))
Expand Down
8 changes: 4 additions & 4 deletions app/models/navigation/Menus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package models.navigation

import features.{KMTopicManagerFeature, KMClusterManagerFeature, KMPreferredReplicaElectionFeature, KMReassignPartitionsFeature, ApplicationFeatures}
import kafka.manager.features.{KMLogKafkaFeature, ClusterFeatures}
import kafka.manager.features.{KMLogKafkaFeature, ClusterFeatures, KMRestrictedFeature}

/**
* @author hiral
Expand All @@ -27,11 +27,11 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) {
Option(Menu("Cluster", items, None))
}

private[this] def topicMenu(cluster: String) : Option[Menu] = {
private[this] def topicMenu(cluster: String, clusterFeatures: ClusterFeatures) : Option[Menu] = {
val defaultItems = IndexedSeq("List".clusterRouteMenuItem(cluster))

val items = {
if(applicationFeatures.features(KMTopicManagerFeature))
if(applicationFeatures.features(KMTopicManagerFeature) && !clusterFeatures.features(KMRestrictedFeature))
defaultItems.+:("Create".clusterRouteMenuItem(cluster))
else
defaultItems
Expand Down Expand Up @@ -75,7 +75,7 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) {
IndexedSeq(
clusterMenu(cluster),
brokersMenu(cluster),
topicMenu(cluster),
topicMenu(cluster, clusterFeatures),
preferredReplicaElectionMenu(cluster),
reassignPartitionsMenu(cluster),
consumersMenu(cluster),
Expand Down
1 change: 1 addition & 0 deletions app/views/cluster/addCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
@b4.text(form("jmxUser"), '_label -> "JMX Auth Username")
@b4.text(form("jmxPass"), '_label -> "JMX Auth Password")
@b4.checkbox(form("jmxSsl"), '_text -> "JMX with SSL")
@b4.checkbox(form("restrictOperations"), '_text -> "Restrict the Operations(Non-Disruptive View only mode)")
@b4.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka")
@b4.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)")
@b4.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers")
Expand Down
1 change: 1 addition & 0 deletions app/views/cluster/updateCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@b4.text(form("jmxUser"), '_label -> "JMX Auth Username")
@b4.text(form("jmxPass"), '_label -> "JMX Auth Password")
@b4.checkbox(form("jmxSsl"), '_text -> "JMX with SSL")
@b4.checkbox(form("restrictOperations"), '_text -> "Restrict the Operations(Non-Disruptive View only mode)")
@b4.checkbox(form("pollConsumers"), '_text -> "Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)")
@b4.checkbox(form("filterConsumers"), '_text -> "Filter out inactive consumers")
@b4.checkbox(form("logkafkaEnabled"), '_text -> "Enable Logkafka")
Expand Down
43 changes: 27 additions & 16 deletions app/views/topic/topicList.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@
}
}

@renderOperations = {

@if(errorOrTopics.fold(err=>false,tl=>tl.list.headOption.map(opt => opt._2.map(ti => ti.clusterContext.config.restrictOperations && ti.clusterContext.config.name.equals(cluster)).getOrElse(false)).getOrElse(false))){
<div class="alert alert-info" role="alert">
Operations are restricted for @cluster Cluster (Enable at the cluster configurations)
</div>
} else {
<table class="table">
<tr>
@features.app(features.KMReassignPartitionsFeature) {
<td>
<a href="@routes.ReassignPartitions.confirmMultipleAssignments(cluster)" class="submit-button btn btn-primary" role="button">Generate Partition Assignments</a>
</td>
<td>
<a href="@routes.ReassignPartitions.runMultipleAssignments(cluster)" class="submit-button btn btn-primary" role="button">Run Partition Assignments</a>
</td>
}
<td>
<a href="@routes.Topic.addPartitionsToMultipleTopics(cluster)" class="submit-button btn btn-primary" role="button">Add Partitions</a>
</td>
</tr>
</tbody>
</table>
}
}

@main(
"Topic List",
menu = theMenu,
Expand All @@ -48,22 +74,7 @@
<div class="card-header"><h4>Operations</h4></div>
<div class="card-body">
<div class="col-md-6">
<table class="table">
<tr>
@features.app(features.KMReassignPartitionsFeature) {
<td>
<a href="@routes.ReassignPartitions.confirmMultipleAssignments(cluster)" class="submit-button btn btn-primary" role="button">Generate Partition Assignments</a>
</td>
<td>
<a href="@routes.ReassignPartitions.runMultipleAssignments(cluster)" class="submit-button btn btn-primary" role="button">Run Partition Assignments</a>
</td>
}
<td>
<a href="@routes.Topic.addPartitionsToMultipleTopics(cluster)" class="submit-button btn btn-primary" role="button">Add Partitions</a>
</td>
</tr>
</tbody>
</table>
@renderOperations
</div>
</div>
</div>
Expand Down
Loading