Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,55 @@ public ZeppelinServer(ZeppelinConfiguration zConf, String serviceLocatorName) th
public void startZeppelin() {
initMetrics();

ContextHandlerCollection contexts = initJettyHandler();
ImmediateErrorHandlerImpl handler = initServiceLocator();
bindZeppelinServices();

setupWebUiContexts(contexts);
initNotebookRepo();
initJMX();
runStartupNote();
registerShutdownHook();

eagerLoadCoreServices();
initializeAndRecoverNotebook();

startJettyServer();
checkServiceLocatorErrors(handler);
waitForJettyTermination();
}

private void initMetrics() {
if (zConf.isJMXEnabled()) {
Metrics.addRegistry(new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM));
}
if (promMetricRegistry.isPresent()) {
Metrics.addRegistry(promMetricRegistry.get());
}
new ClassLoaderMetrics().bindTo(Metrics.globalRegistry);
new JvmMemoryMetrics().bindTo(Metrics.globalRegistry);
new JvmThreadMetrics().bindTo(Metrics.globalRegistry);
new FileDescriptorMetrics().bindTo(Metrics.globalRegistry);
new ProcessorMetrics().bindTo(Metrics.globalRegistry);
new UptimeMetrics().bindTo(Metrics.globalRegistry);
new JVMInfoBinder().bindTo(Metrics.globalRegistry);
}

private ContextHandlerCollection initJettyHandler() {
TimedHandler timedHandler = new TimedHandler(Metrics.globalRegistry, Tags.empty());
jettyWebServer.setHandler(timedHandler);

ContextHandlerCollection contexts = new ContextHandlerCollection();
timedHandler.setHandler(contexts);
ServiceLocatorUtilities.enableImmediateScope(sharedServiceLocator);
ServiceLocatorUtilities.addClasses(sharedServiceLocator,
ImmediateErrorHandlerImpl.class);
ImmediateErrorHandlerImpl handler = sharedServiceLocator.getService(ImmediateErrorHandlerImpl.class);
return contexts;
}

private ImmediateErrorHandlerImpl initServiceLocator() {
ServiceLocatorUtilities.enableImmediateScope(sharedServiceLocator);
ServiceLocatorUtilities.addClasses(sharedServiceLocator, ImmediateErrorHandlerImpl.class);
return sharedServiceLocator.getService(ImmediateErrorHandlerImpl.class);
}

private void bindZeppelinServices() {
ServiceLocatorUtilities.bind(
sharedServiceLocator,
new AbstractBinder() {
Expand Down Expand Up @@ -216,20 +254,26 @@ protected void configure() {
}
}
});
}

// Multiple Web UI
String newUiWebAppContextPath = isNewUiDefault(zConf) ? zConf.getServerContextPath() : NON_DEFAULT_NEW_UI_WEB_APP_CONTEXT_PATH;
private void setupWebUiContexts(ContextHandlerCollection contexts) {
String newUiWebAppContextPath = isNewUiDefault(zConf) ? zConf.getServerContextPath() : NON_DEFAULT_NEW_UI_WEB_APP_CONTEXT_PATH;
boolean newUiWebAppShouldExist = isNewUiDefault(zConf);

String classicUiWebAppContextPath = !isNewUiDefault(zConf) ? zConf.getServerContextPath() : NON_DEFAULT_CLASSIC_UI_WEB_APP_CONTEXT_PATH;
boolean classicUiWebAppShouldExist = !isNewUiDefault(zConf);

final WebAppContext newUiWebApp = setupWebAppContext(contexts, zConf, zConf.getString(ConfVars.ZEPPELIN_ANGULAR_WAR),
newUiWebAppContextPath, newUiWebAppShouldExist);

final WebAppContext classicUiWebApp = setupWebAppContext(contexts, zConf, zConf.getString(ConfVars.ZEPPELIN_WAR),
classicUiWebAppContextPath, classicUiWebAppShouldExist);

initWebApp(newUiWebApp);
initWebApp(classicUiWebApp);
}

private void initNotebookRepo() {
NotebookRepo repo =
ServiceLocatorUtilities.getService(sharedServiceLocator, NotebookRepo.class.getName());
NoteParser noteParser =
Expand All @@ -239,39 +283,73 @@ protected void configure() {
} catch (IOException e) {
LOGGER.error("Failed to init NotebookRepo", e);
}
}

initJMX();
private void initJMX() {
// JMX Enable
if (zConf.isJMXEnabled()) {
int port = zConf.getJMXPort();
// Setup JMX
MBeanContainer mbeanContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
jettyWebServer.addBean(mbeanContainer);
JMXServiceURL jmxURL;
try {
jmxURL = new JMXServiceURL(
String.format(
"service:jmx:rmi://0.0.0.0:%d/jndi/rmi://0.0.0.0:%d/jmxrmi",
port, port));
ConnectorServer jmxServer = new ConnectorServer(jmxURL, "org.eclipse.jetty.jmx:name=rmiconnectorserver");
jettyWebServer.addBean(jmxServer);
LOGGER.info("JMX Enabled with port: {}", port);
} catch (MalformedURLException e) {
LOGGER.error("Invalid JMXServiceURL - JMX Disabled", e);
}
}
}

private void runStartupNote() {
runNoteOnStart(sharedServiceLocator);
}

private void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}

private void eagerLoadCoreServices() {
ServiceLocatorUtilities.getService(sharedServiceLocator, Notebook.class.getName());
ServiceLocatorUtilities.getService(sharedServiceLocator, SearchService.class.getName());
ServiceLocatorUtilities.getService(sharedServiceLocator, SchedulerService.class.getName());
}

private void initializeAndRecoverNotebook() {
// Try to get Notebook from ServiceLocator, because Notebook instantiation is lazy, it is
// created when user open zeppelin in browser if we don't get it explicitly here.
// Lazy loading will cause paragraph recovery and cron job initialization is delayed.
Notebook notebook = ServiceLocatorUtilities.getService(
sharedServiceLocator, Notebook.class.getName());
ServiceLocatorUtilities.getService(
sharedServiceLocator, SearchService.class.getName());
ServiceLocatorUtilities.getService(
sharedServiceLocator, SchedulerService.class.getName());
Notebook notebook = ServiceLocatorUtilities.getService(sharedServiceLocator, Notebook.class.getName());

// Initialization of the Notes in the notebook asynchronously
notebook.initNotebook();
// Try to recover here, don't do it in constructor of Notebook, because it would cause deadlock.
notebook.recoveryIfNecessary();
}

LOGGER.info("Starting zeppelin server");
private void startJettyServer() {
LOGGER.info("Starting Zeppelin server");
/*
* Get a nice Dump after jetty start, quite helpful for debugging
* jettyWebServer.setDumpAfterStart(true);
*/
try {
jettyWebServer.start(); // Instantiates ZeppelinServer
} catch (Exception e) {
LOGGER.error("Error while running jettyServer", e);
}
catch (Exception e) {
LOGGER.error("Error while running Jetty", e);
shutdown(-1);
}
LOGGER.info("Done, Zeppelin server started");
}

LOGGER.info("Done, zeppelin server started");
private void checkServiceLocatorErrors(ImmediateErrorHandlerImpl handler) {
try {
List<ErrorData> errorDatas = handler.waitForAtLeastOneConstructionError(5000);
for (ErrorData errorData : errorDatas) {
Expand All @@ -287,7 +365,9 @@ protected void configure() {
shutdown();
Thread.currentThread().interrupt();
}
}

private void waitForJettyTermination() {
if (jettyWebServer.isStopped() || jettyWebServer.isStopping()) {
LOGGER.debug("jetty server is stopped {} - is stopping {}", jettyWebServer.isStopped(), jettyWebServer.isStopping());
} else {
Expand All @@ -309,43 +389,6 @@ public static void main(String[] args) throws Exception {
}
}

private void initJMX() {
// JMX Enable
if (zConf.isJMXEnabled()) {
int port = zConf.getJMXPort();
// Setup JMX
MBeanContainer mbeanContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
jettyWebServer.addBean(mbeanContainer);
JMXServiceURL jmxURL;
try {
jmxURL = new JMXServiceURL(
String.format(
"service:jmx:rmi://0.0.0.0:%d/jndi/rmi://0.0.0.0:%d/jmxrmi",
port, port));
ConnectorServer jmxServer = new ConnectorServer(jmxURL, "org.eclipse.jetty.jmx:name=rmiconnectorserver");
jettyWebServer.addBean(jmxServer);
LOGGER.info("JMX Enabled with port: {}", port);
} catch (MalformedURLException e) {
LOGGER.error("Invalid JMXServiceURL - JMX Disabled", e);
}
}
}
private void initMetrics() {
if (zConf.isJMXEnabled()) {
Metrics.addRegistry(new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM));
}
if (promMetricRegistry.isPresent()) {
Metrics.addRegistry(promMetricRegistry.get());
}
new ClassLoaderMetrics().bindTo(Metrics.globalRegistry);
new JvmMemoryMetrics().bindTo(Metrics.globalRegistry);
new JvmThreadMetrics().bindTo(Metrics.globalRegistry);
new FileDescriptorMetrics().bindTo(Metrics.globalRegistry);
new ProcessorMetrics().bindTo(Metrics.globalRegistry);
new UptimeMetrics().bindTo(Metrics.globalRegistry);
new JVMInfoBinder().bindTo(Metrics.globalRegistry);
}

public void shutdown(int exitCode) {
if (!duringShutdown.getAndSet(true)) {
LOGGER.info("Shutting down Zeppelin Server ... - ExitCode {}", exitCode);
Expand Down
Loading