1
- use std:: { mem , path:: Path , sync:: Arc } ;
1
+ use std:: { path:: Path , sync:: Arc } ;
2
2
3
3
use anyhow:: { Context , Result } ;
4
4
use clap:: Parser ;
@@ -13,14 +13,23 @@ use graph::{
13
13
} ;
14
14
use graph_core:: polling_monitor:: ipfs_service;
15
15
use graph_node:: {
16
+ dev:: watcher,
16
17
dev:: watcher:: { parse_manifest_args, watch_subgraphs} ,
17
18
launcher,
18
19
opt:: Opt ,
19
20
} ;
20
21
use lazy_static:: lazy_static;
21
22
22
23
#[ cfg( unix) ]
23
- use pgtemp:: PgTempDBBuilder ;
24
+ use pgtemp:: { PgTempDB , PgTempDBBuilder } ;
25
+
26
+ // Add an alias for the temporary Postgres DB handle. On non unix
27
+ // targets we don’t have pgtemp, but we still need the type to satisfy the
28
+ // function signatures.
29
+ #[ cfg( unix) ]
30
+ type TempPgDB = PgTempDB ;
31
+ #[ cfg( not( unix) ) ]
32
+ type TempPgDB = ( ) ;
24
33
25
34
git_testament ! ( TESTAMENT ) ;
26
35
lazy_static ! {
@@ -90,6 +99,35 @@ pub struct DevOpt {
90
99
default_value = "https://api.thegraph.com/ipfs"
91
100
) ]
92
101
pub ipfs : Vec < String > ,
102
+ #[ clap(
103
+ long,
104
+ default_value = "8000" ,
105
+ value_name = "PORT" ,
106
+ help = "Port for the GraphQL HTTP server" ,
107
+ env = "GRAPH_GRAPHQL_HTTP_PORT"
108
+ ) ]
109
+ pub http_port : u16 ,
110
+ #[ clap(
111
+ long,
112
+ default_value = "8030" ,
113
+ value_name = "PORT" ,
114
+ help = "Port for the index node server"
115
+ ) ]
116
+ pub index_node_port : u16 ,
117
+ #[ clap(
118
+ long,
119
+ default_value = "8020" ,
120
+ value_name = "PORT" ,
121
+ help = "Port for the JSON-RPC admin server"
122
+ ) ]
123
+ pub admin_port : u16 ,
124
+ #[ clap(
125
+ long,
126
+ default_value = "8040" ,
127
+ value_name = "PORT" ,
128
+ help = "Port for the Prometheus metrics server"
129
+ ) ]
130
+ pub metrics_port : u16 ,
93
131
}
94
132
95
133
/// Builds the Graph Node options from DevOpt
@@ -109,7 +147,12 @@ fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result<Opt> {
109
147
args. push ( "--postgres-url" . to_string ( ) ) ;
110
148
args. push ( db_url. to_string ( ) ) ;
111
149
112
- let opt = Opt :: parse_from ( args) ;
150
+ let mut opt = Opt :: parse_from ( args) ;
151
+
152
+ opt. http_port = dev_opt. http_port ;
153
+ opt. admin_port = dev_opt. admin_port ;
154
+ opt. metrics_port = dev_opt. metrics_port ;
155
+ opt. index_node_port = dev_opt. index_node_port ;
113
156
114
157
Ok ( opt)
115
158
}
@@ -118,7 +161,7 @@ async fn run_graph_node(
118
161
logger : & Logger ,
119
162
opt : Opt ,
120
163
link_resolver : Arc < dyn LinkResolver > ,
121
- subgraph_updates_channel : Option < mpsc:: Receiver < ( DeploymentHash , SubgraphName ) > > ,
164
+ subgraph_updates_channel : mpsc:: Receiver < ( DeploymentHash , SubgraphName ) > ,
122
165
) -> Result < ( ) > {
123
166
let env_vars = Arc :: new ( EnvVars :: from_env ( ) . context ( "Failed to load environment variables" ) ?) ;
124
167
@@ -139,16 +182,19 @@ async fn run_graph_node(
139
182
env_vars,
140
183
ipfs_service,
141
184
link_resolver,
142
- subgraph_updates_channel,
185
+ Some ( subgraph_updates_channel) ,
143
186
)
144
187
. await ;
145
188
Ok ( ( ) )
146
189
}
147
190
148
191
/// Get the database URL, either from the provided option or by creating a temporary database
149
- fn get_database_url ( postgres_url : Option < & String > , database_dir : & Path ) -> Result < String > {
192
+ fn get_database_url (
193
+ postgres_url : Option < & String > ,
194
+ database_dir : & Path ,
195
+ ) -> Result < ( String , Option < TempPgDB > ) > {
150
196
if let Some ( url) = postgres_url {
151
- Ok ( url. clone ( ) )
197
+ Ok ( ( url. clone ( ) , None ) )
152
198
} else {
153
199
#[ cfg( unix) ]
154
200
{
@@ -162,13 +208,14 @@ fn get_database_url(postgres_url: Option<&String>, database_dir: &Path) -> Resul
162
208
163
209
let db = PgTempDBBuilder :: new ( )
164
210
. with_data_dir_prefix ( database_dir)
165
- . with_initdb_param ( "-E" , "UTF8" )
166
- . with_initdb_param ( "--locale" , "C" )
211
+ . persist_data ( false )
212
+ . with_initdb_arg ( "-E" , "UTF8" )
213
+ . with_initdb_arg ( "--locale" , "C" )
167
214
. start ( ) ;
168
215
let url = db. connection_uri ( ) . to_string ( ) ;
169
- // Prevent the database from being dropped by forgetting it
170
- mem :: forget ( db ) ;
171
- Ok ( url)
216
+ // Return the handle so it lives for the lifetime of the program; dropping it will
217
+ // shut down Postgres and remove the temporary directory automatically.
218
+ Ok ( ( url, Some ( db ) ) )
172
219
}
173
220
174
221
#[ cfg( not( unix) ) ]
@@ -182,49 +229,76 @@ fn get_database_url(postgres_url: Option<&String>, database_dir: &Path) -> Resul
182
229
183
230
#[ tokio:: main]
184
231
async fn main ( ) -> Result < ( ) > {
232
+ std:: env:: set_var ( "ETHEREUM_REORG_THRESHOLD" , "10" ) ;
233
+ std:: env:: set_var ( "GRAPH_NODE_DISABLE_DEPLOYMENT_HASH_VALIDATION" , "true" ) ;
185
234
env_logger:: init ( ) ;
186
235
let dev_opt = DevOpt :: parse ( ) ;
187
236
188
237
let database_dir = Path :: new ( & dev_opt. database_dir ) ;
189
238
190
239
let logger = logger ( true ) ;
191
240
192
- info ! ( logger, "Starting Graph Node Dev" ) ;
241
+ info ! ( logger, "Starting Graph Node Dev 1 " ) ;
193
242
info ! ( logger, "Database directory: {}" , database_dir. display( ) ) ;
194
243
195
- // Get the database URL
196
- let db_url = get_database_url ( dev_opt. postgres_url . as_ref ( ) , database_dir) ?;
244
+ // Get the database URL and keep the temporary database handle alive for the life of the
245
+ // program so that it is dropped (and cleaned up) on graceful shutdown.
246
+ let ( db_url, mut temp_db_opt) = get_database_url ( dev_opt. postgres_url . as_ref ( ) , database_dir) ?;
197
247
198
248
let opt = build_args ( & dev_opt, & db_url) ?;
199
249
200
250
let ( manifests_paths, source_subgraph_aliases) =
201
251
parse_manifest_args ( dev_opt. manifests , dev_opt. sources , & logger) ?;
202
252
let file_link_resolver = Arc :: new ( FileLinkResolver :: new ( None , source_subgraph_aliases. clone ( ) ) ) ;
203
253
204
- let ( tx, rx) = dev_opt . watch . then ( || mpsc:: channel ( 1 ) ) . unzip ( ) ;
254
+ let ( tx, rx) = mpsc:: channel ( 1 ) ;
205
255
206
256
let logger_clone = logger. clone ( ) ;
207
257
graph:: spawn ( async move {
208
258
let _ = run_graph_node ( & logger_clone, opt, file_link_resolver, rx) . await ;
209
259
} ) ;
210
260
211
- if let Some ( tx) = tx {
261
+ if let Err ( e) =
262
+ watcher:: deploy_all_subgraphs ( & logger, & manifests_paths, & source_subgraph_aliases, & tx)
263
+ . await
264
+ {
265
+ error ! ( logger, "Error deploying subgraphs" ; "error" => e. to_string( ) ) ;
266
+ std:: process:: exit ( 1 ) ;
267
+ }
268
+
269
+ if dev_opt. watch {
270
+ let logger_clone_watch = logger. clone ( ) ;
212
271
graph:: spawn_blocking ( async move {
213
272
if let Err ( e) = watch_subgraphs (
214
- & logger ,
273
+ & logger_clone_watch ,
215
274
manifests_paths,
216
275
source_subgraph_aliases,
217
276
vec ! [ "pgtemp-*" . to_string( ) ] ,
218
277
tx,
219
278
)
220
279
. await
221
280
{
222
- error ! ( logger , "Error watching subgraphs" ; "error" => e. to_string( ) ) ;
281
+ error ! ( logger_clone_watch , "Error watching subgraphs" ; "error" => e. to_string( ) ) ;
223
282
std:: process:: exit ( 1 ) ;
224
283
}
225
284
} ) ;
226
285
}
227
286
228
- graph:: futures03:: future:: pending :: < ( ) > ( ) . await ;
287
+ // Wait for Ctrl+C so we can shut down cleanly and drop the temporary database, which removes
288
+ // the data directory.
289
+ tokio:: signal:: ctrl_c ( )
290
+ . await
291
+ . expect ( "Failed to listen for Ctrl+C signal" ) ;
292
+ info ! ( logger, "Received Ctrl+C, shutting down." ) ;
293
+
294
+ // Explicitly shut down and clean up the temporary database directory if we started one.
295
+ #[ cfg( unix) ]
296
+ if let Some ( db) = temp_db_opt. take ( ) {
297
+ db. shutdown ( ) ;
298
+ }
299
+
300
+ std:: process:: exit ( 0 ) ;
301
+
302
+ #[ allow( unreachable_code) ]
229
303
Ok ( ( ) )
230
304
}
0 commit comments