@@ -24,7 +24,7 @@ require Exporter;
24
24
our @ISA = qw( Exporter) ;
25
25
our @EXPORT = qw( queue purge delete_corpus delete_service register_corpus register_service
26
26
service_to_id serviceid_to_iid serviceid_to_iid corpus_to_id corpus_report id_to_corpus id_to_service
27
- serviceiid_to_formats serviceiid_to_id serviceid_enables
27
+ serviceiid_to_formats serviceiid_to_id serviceid_enables serviceid_requires
28
28
count_entries count_messages
29
29
current_corpora current_services current_inputformats current_outputformats
30
30
service_report classic_report get_custom_entries
@@ -34,8 +34,10 @@ our @EXPORT = qw(queue purge delete_corpus delete_service register_corpus regist
34
34
repository_size mark_limbo_entries_queued get_entry_type
35
35
fetch_tasks complete_tasks) ;
36
36
37
- our (%CorpusIDs ,%ServiceIDs ,%IDServices ,%IDCorpora ,%ServiceFormats ,%ServiceIDEnables );
38
- our (%IIDs ,%IID_to_ID );
37
+ our (%CorpusIDs ,%ServiceIDs ,%IDServices ,%IDCorpora ,%ServiceFormats ); # Maps between internal and external names
38
+ our (%IIDs ,%IID_to_ID ); # More maps
39
+ our (%ServiceIDEnables ,%ServiceIDRequires ); # Dependencies
40
+
39
41
sub corpus_to_id {
40
42
my ($db , $corpus ) = @_ ;
41
43
my $corpusid = $CorpusIDs {$corpus };
@@ -78,6 +80,21 @@ sub serviceid_enables {
78
80
$ServiceIDEnables {$serviceid } = $enabled_services ;
79
81
}
80
82
return @$enabled_services ; }
83
+ sub serviceid_requires {
84
+ my ($db ,$serviceid ) = @_ ;
85
+ my $required_services = $ServiceIDRequires {$serviceid };
86
+ if (! defined $required_services ) {
87
+ $required_services = [];
88
+ my $sth = $db -> prepare(" SELECT foundation from dependencies where master=?" );
89
+ $sth -> execute($serviceid );
90
+ my $foundation_service ;
91
+ $sth -> bind_columns(\$foundation_service );
92
+ while ($sth -> fetch) {
93
+ push @$required_services , $foundation_service ;
94
+ }
95
+ $ServiceIDRequires {$serviceid } = $required_services ;
96
+ }
97
+ return @$required_services ; }
81
98
sub id_to_corpus {
82
99
my ($db , $corpusid ) = @_ ;
83
100
my $corpus = $IDCorpora {$corpusid };
@@ -123,13 +140,14 @@ sub delete_corpus {
123
140
$sth -> execute($corpusid );
124
141
return $db -> purge(corpusid => $corpusid ); }
125
142
126
- sub delete_service {
127
- my ($db ,$service ) = @_ ;
128
- return unless ($service && (length ($service )>0));
129
- my $serviceid = $db -> service_to_id($service );
130
- my $sth = $db -> prepare(" delete from services where serviceid=?" );
131
- $sth -> execute($serviceid );
132
- return $db -> purge(serviceid => $service ); }
143
+ # TODO: We don't have a good deleting workflow for now
144
+ # sub delete_service {
145
+ # my ($db,$service) = @_;
146
+ # return unless ($service && (length($service)>0));
147
+ # my $serviceid = $db->service_to_id($service);
148
+ # my $sth = $db->prepare("delete from services where serviceid=?");
149
+ # $sth->execute($serviceid);
150
+ # return $db->purge(serviceid=>$service); }
133
151
134
152
sub register_corpus {
135
153
my ($db ,$corpus ) = @_ ;
@@ -163,6 +181,7 @@ sub register_service {
163
181
$service {outputformat } = lc ($service {outputformat });
164
182
$service {requires_analyses } //= [];
165
183
$service {requires_aggregation } //= [];
184
+ $db -> do(' BEGIN TRANSACTION' );
166
185
my $sth = $db -> prepare(" INSERT INTO services
167
186
(name,version,iid,type,xpath,url,inputconverter,inputformat,outputformat,resource)
168
187
values(?,?,?,?,?,?,?,?,?,?)" );
@@ -174,30 +193,37 @@ sub register_service {
174
193
$sth = $db -> prepare(" INSERT INTO dependencies (master,foundation) values(?,?)" );
175
194
my $dependency_weight = 0;
176
195
my @dependencies = grep {defined } ($service {inputconverter },@{$service {requires_analyses }},@{$service {requires_aggregation }});
196
+ my @foundations = ();
177
197
foreach my $foundation (@dependencies ) {
178
198
next if $foundation eq ' import' ; # Built-in to always have completed prior to the service being registered
179
199
$dependency_weight ++;
180
200
my $foundation_id = $db -> service_to_id($foundation );
201
+ push @foundations , $foundation_id ;
181
202
$sth -> execute($id ,$foundation_id ); }
182
203
# Register Tasks on each corpus
183
204
my $status = -5 - $dependency_weight ;
184
- # For every import task, queue a task with the service $id
185
- # TODO: The list of tasks is proportional to the size of the corpus, so a big corpus will have millions of tasks
186
- # how do we register them quickly? Maybe a single transaction will do the trick for the insert...
187
- # but what about the enormous select? Do 3 million entries fit in memory?
205
+ # For every import task, queue a task with the new serviceid
188
206
my $entry_query = $db -> prepare(" SELECT entry from tasks where corpusid=? and serviceid=1 and status=-1" );
189
207
my $insert_query = $db -> prepare(" INSERT into tasks (corpusid,serviceid,entry,status) values(?,?,?,?)" );
208
+ my $complete_foundations_query =
209
+ $db -> prepare(" SELECT entry from tasks where corpusid=? and serviceid=? and (status=-1 or status=-2)" );
210
+ my $enable_tasks = $db -> prepare(" UPDATE tasks SET status = status + 1 WHERE entry=? and serviceid=?" );
190
211
foreach my $corpus (@{$service {corpora }}) {
191
212
my $corpusid = $db -> corpus_to_id($corpus );
192
213
$entry_query -> execute($corpusid );
193
- my ( $entry , @entries ) ;
214
+ my $entry ;
194
215
$entry_query -> bind_columns(\$entry );
195
- while ($entry_query -> fetch) { push @entries , $entry ; }
196
- $db -> do(' BEGIN TRANSACTION' );
197
- foreach my $e (@entries ) {
198
- $insert_query -> execute($corpusid ,$id ,$e ,$status ); }
199
- $db -> do(' COMMIT' );
216
+ while ($entry_query -> fetch) {
217
+ $insert_query -> execute($corpusid ,$id ,$entry ,$status ); }
218
+ # Once the generic tasks are inserted, observe already completed successful tasks
219
+ foreach my $foundation_id (@foundations ) {
220
+ $complete_foundations_query -> execute($corpusid ,$foundation_id );
221
+ $complete_foundations_query -> bind_columns(\$entry );
222
+ while ($complete_foundations_query -> fetch) {
223
+ $enable_tasks -> execute($entry ,$id );}
224
+ }
200
225
}
226
+ $db -> do(' COMMIT' );
201
227
return $id ; }
202
228
203
229
sub update_service {
@@ -347,17 +373,38 @@ sub service_description {
347
373
348
374
sub mark_entry_queued {
349
375
my ($db ,$data ) = @_ ;
350
- return unless ($data -> {corpus } && $data -> {service } && $data -> {entry });
351
- my $corpusid = $db -> corpus_to_id($data -> {corpus });
352
- my $serviceid = $db -> service_to_id($data -> {service });
353
- my $queue_entry_query = $db -> prepare(" UPDATE tasks SET status=-5
376
+ $data -> {serviceid } //= $db -> service_to_id($data -> {service });
377
+ $data -> {corpusid } //= $db -> corpus_to_id($data -> {corpus });
378
+ return unless ($data -> {corpusid } && $data -> {serviceid } && $data -> {entry });
379
+ my $corpusid = $data -> {corpusid };
380
+ my $serviceid = $data -> {serviceid };
381
+ my @required_services = $db -> serviceid_requires($serviceid );
382
+ my $count_complete_foundations =
383
+ $db -> prepare(" SELECT count(serviceid) from tasks where corpusid=? and entry=? and serviceid=? and (status=-1 or status=-2)" );
384
+ my $count =0;
385
+ foreach my $foundation (@required_services ) {
386
+ $count_complete_foundations -> execute($corpusid ,$data -> {entry },$foundation );
387
+ $count += $count_complete_foundations -> fetchrow_array();
388
+ }
389
+ my $status = -5 - scalar (@required_services ) + $count ;
390
+ print STDERR " Required: " ,scalar (@required_services )," \n " ;
391
+ print STDERR " Ready foundations: $count \n " ;
392
+ print STDERR " Set new status: $status \n " ;
393
+ my $queue_entry_query = $db -> prepare(" UPDATE tasks SET status=?
354
394
WHERE corpusid=? AND serviceid=? and entry=?" );
395
+
355
396
my $delete_messages_query = $db -> prepare(" DELETE from logs WHERE taskid IN
356
397
(SELECT logs.taskid FROM logs INNER JOIN tasks ON (tasks.taskid = logs.taskid)
357
- WHERE tasks.status=-5)" );
358
- $queue_entry_query -> execute($corpusid ,$serviceid ,$data -> {entry });
398
+ WHERE tasks.status < -4)" ); # All currently processed
399
+ $queue_entry_query -> execute($status ,$corpusid ,$serviceid ,$data -> {entry });
400
+ # Vote up for completed foundations
401
+
359
402
$delete_messages_query -> execute();
360
- # TODO: Also handle dependencies: +1 on any service depending on serviceid, for this entry
403
+ my @enabled_services = $db -> serviceid_enables($serviceid );
404
+ foreach my $enabled_service (@enabled_services ) {
405
+ $db -> mark_entry_queued({corpus => $data -> {corpus }, serviceid => $enabled_service , entry => $data -> {entry } });
406
+ }
407
+
361
408
return 1;
362
409
}
363
410
@@ -378,45 +425,65 @@ sub mark_custom_entries_queued {
378
425
if ($data -> {what }) {
379
426
$what = $data -> {what };
380
427
}}}
381
-
428
+
382
429
my $rerun_query ;
430
+ my @required_services = $db -> serviceid_requires($serviceid );
431
+ my $status = -5 - scalar (@required_services );
383
432
# Start a transaction
384
433
$db -> do(" BEGIN TRANSACTION" );
385
434
if ($what ) { # We have severity, category and what
386
435
# TODO: Propagate blocks to all (service,entry) pairs depending on this task
387
- # Mark for rerun = SET the status to all affected tasks to -5
388
- $rerun_query = $db -> prepare(" UPDATE tasks SET status=-5
436
+ # Mark for rerun = SET the status to all affected tasks to -5-foundations
437
+ $rerun_query = $db -> prepare(" UPDATE tasks SET status=?
389
438
WHERE taskid IN (SELECT tasks.taskid FROM tasks INNER JOIN logs ON (tasks.taskid = logs.taskid)
390
439
WHERE tasks.corpusid=? AND tasks.serviceid=? AND tasks.status$severity
391
440
AND logs.category=? and logs.what=?)" );
392
- $rerun_query -> execute($corpusid ,$serviceid ,$category ,$what ); }
441
+ $rerun_query -> execute($status , $ corpusid ,$serviceid ,$category ,$what ); }
393
442
elsif ($category ) { # We have severity and category
394
443
# TODO: Propagate blocks to all (service,entry) pairs depending on this task
395
- # Mark for rerun = SET the status to all affected tasks to -5
396
- $rerun_query = $db -> prepare(" UPDATE tasks SET status=-5
444
+ # Mark for rerun = SET the status to all affected tasks to -5-foundations
445
+ $rerun_query = $db -> prepare(" UPDATE tasks SET status=?
397
446
WHERE taskid IN (SELECT tasks.taskid FROM tasks INNER JOIN logs ON (tasks.taskid = logs.taskid)
398
447
WHERE tasks.corpusid=? AND tasks.serviceid=? AND tasks.status$severity
399
448
AND logs.category=?)" );
400
- $rerun_query -> execute($corpusid ,$serviceid ,$category ); }
449
+ $rerun_query -> execute($status , $ corpusid ,$serviceid ,$category ); }
401
450
elsif ($severity ) { # We have severity
402
451
# TODO: Propagate blocks to all (service,entry) pairs depending on this task
403
- # Mark for rerun = SET the status to all affected tasks to -5
404
- $rerun_query = $db -> prepare(" UPDATE tasks SET status=-5
452
+ # Mark for rerun = SET the status to all affected tasks to -5-foundations
453
+ $rerun_query = $db -> prepare(" UPDATE tasks SET status=?
405
454
WHERE corpusid=? AND serviceid=? AND status$severity " );
406
- $rerun_query -> execute($corpusid ,$serviceid ); }
455
+ $rerun_query -> execute($status , $ corpusid ,$serviceid ); }
407
456
else { # Simplest case, rerun an entire (corpus,service) pair.
408
- # Mark for rerun = SET the status to all affected tasks to -5
409
- $rerun_query = $db -> prepare(" UPDATE tasks SET status=-5
457
+ # Mark for rerun = SET the status to all affected tasks to -5-foundations
458
+ $rerun_query = $db -> prepare(" UPDATE tasks SET status=?
410
459
WHERE corpusid=? AND serviceid=?" );
411
- $rerun_query -> execute($corpusid ,$serviceid );
460
+ $rerun_query -> execute($status , $ corpusid ,$serviceid );
412
461
# TODO: Propagate blocks to all (service,entry) pairs depending on this task
413
462
}
414
463
# Delete all existing messages for tasks that are marked for rerun (status=-5)
415
-
416
464
my $delete_messages_query = $db -> prepare(" DELETE from logs WHERE taskid IN
417
465
(SELECT logs.taskid FROM logs INNER JOIN tasks ON (tasks.taskid = logs.taskid)
418
- WHERE tasks.status=-5 )" );
466
+ WHERE tasks.status<-4 )" );
419
467
$delete_messages_query -> execute();
468
+
469
+ # +1 for each foundation that has already completed
470
+ my $enable_tasks = $db -> prepare(" UPDATE tasks SET status = status + 1 WHERE entry=? and serviceid=?" );
471
+ my $complete_foundation_entries =
472
+ $db -> prepare(" SELECT entry from tasks where corpusid=? and serviceid=? and (status=-1 or status=-2)" );
473
+ my $count =0;
474
+ foreach my $foundation (@required_services ) {
475
+ $complete_foundation_entries -> execute($corpusid ,$foundation );
476
+ my $entry ;
477
+ $complete_foundation_entries -> bind_columns(\$entry );
478
+ while ($complete_foundation_entries -> fetch) {
479
+ $enable_tasks -> execute($entry ,$serviceid );
480
+ }
481
+ }
482
+
483
+ # Recursively rerun all enabled services
484
+
485
+
486
+
420
487
$db -> do(" COMMIT" );
421
488
}
422
489
@@ -776,17 +843,22 @@ sub complete_tasks {
776
843
my $entry = $result -> {entry };
777
844
my $taskid = $result -> {taskid };
778
845
my $iid = $result -> {service };
846
+ my $status = $result -> {status };
779
847
my $serviceid = $db -> serviceiid_to_id($iid );
780
- my @enables = $db -> serviceid_enables($serviceid );
781
- foreach my $enabled_service (@enables ) {
782
- $enable_tasks -> execute($entry ,$enabled_service );
783
- }
848
+ # Delete old messages
784
849
$delete_messages -> execute($taskid );
850
+ # Mark task as completed
785
851
$mark_complete -> execute($result -> {status },$taskid );
852
+ # Insert new messages
786
853
foreach my $message (@{$result -> {messages }||[]}) {
787
854
$message -> {severity } = status_code($message -> {severity });
788
855
$add_message -> execute($taskid ,map {$message -> {$_ }} qw/ severity category what details/ );
789
856
}
857
+ # Propagate in dependencies
858
+ if (($status == -1) || ($status == -2)) { # If warning or OK job
859
+ my @enables = $db -> serviceid_enables($serviceid );
860
+ foreach my $enabled_service (@enables ) { # Enable follow-up services
861
+ $enable_tasks -> execute($entry ,$enabled_service ); }}
790
862
}
791
863
$db -> do(' COMMIT' );
792
864
}
0 commit comments