@@ -245,8 +245,8 @@ private void initFromArchive() {
245245 // Transition the RUNNING and WAITING activities in:
246246 // RUNNING -> UNKNOWN
247247 // WAITING -> ABORTED
248- // TODO: transition activities that are SCHEDULED and scheduled time is in the past --> ABORTED
249- // See method restoreActivitiesFromList
248+ // Transition activities that are SCHEDULED and scheduled time is in the past --> ABORTED
249+ // See method restoreActivitiesFromList
250250 try {
251251 // This part retrieves everything but not the event-based scheduled activities, for which the generation time is set
252252 // to EPOCH.
@@ -257,7 +257,7 @@ private void initFromArchive() {
257257
258258 restoreActivitiesFromList (scheduledItems );
259259
260- // Now it is the time of the event-based activities
260+ // Now it is the time of the event-based activities, start and end time set to EPOCH
261261 scheduledItems = archive .retrieve (Instant .EPOCH ,
262262 new ScheduledActivityDataFilter (null , null , null , null ,
263263 Collections .singletonList (SchedulingState .SCHEDULED ), null ),
@@ -271,14 +271,97 @@ private void initFromArchive() {
271271 }
272272
273273 private void restoreActivitiesFromList (List <ScheduledActivityData > scheduledItems ) throws SchedulingException {
274+ Instant now = Instant .now ();
275+ // Partition: first schedule all the AbsoluteTimeSchedulingTrigger and the others are in another group
276+ List <ScheduledActivityData > absoluteScheduled = scheduledItems .stream ().filter (i -> i .getTrigger () instanceof AbsoluteTimeSchedulingTrigger ).collect (Collectors .toList ());
277+ List <ScheduledActivityData > relativeScheduled = scheduledItems .stream ().filter (i -> i .getTrigger () instanceof RelativeTimeSchedulingTrigger ).collect (Collectors .toList ());
278+ List <ScheduledActivityData > otherScheduled = scheduledItems .stream ().filter (i -> !(i .getTrigger () instanceof AbsoluteTimeSchedulingTrigger ) &&
279+ !(i .getTrigger () instanceof RelativeTimeSchedulingTrigger )).collect (Collectors .toList ());
280+ // Sort the relative-scheduled items
281+ relativeScheduled = buildSortedRelativeScheduled (relativeScheduled );
282+ // Restore items
283+ restoreActivities (absoluteScheduled , now );
284+ restoreActivities (otherScheduled , now );
285+ restoreActivities (relativeScheduled , now );
286+ }
287+
288+ /**
289+ * The idea of this method is to build a list of scheduled activities, which would be able to resolve their predecessors
290+ * as they will be added to the scheduler. Therefore, the list will first have the activities without predecessors
291+ * (or with predecessors of other types). And only then, the activities with predecessors also of type relative-time,
292+ * in the correct order.
293+ * @param relativeScheduled the list to order
294+ * @return the ordered list
295+ */
296+ private List <ScheduledActivityData > buildSortedRelativeScheduled (List <ScheduledActivityData > relativeScheduled ) {
297+ List <ScheduledActivityData > relativeScheduledToReturn = new ArrayList <>(relativeScheduled .size ());
298+ Queue <ScheduledActivityData > toProcess = new LinkedList <>(relativeScheduled );
299+ int pushBackIterations = 0 ;
300+ while (!toProcess .isEmpty ()) {
301+ // Get the top
302+ ScheduledActivityData d = toProcess .remove ();
303+ RelativeTimeSchedulingTrigger trigger = (RelativeTimeSchedulingTrigger ) d .getTrigger ();
304+ // Get the referenced activities
305+ boolean backInQueue = false ;
306+ for (String pred : trigger .getPredecessors ()) {
307+ // Look for predecessor in the queue: if it is there, then put the item back in the queue
308+ for (ScheduledActivityData sad : toProcess ) {
309+ if (sad .getExternalId ().equals (pred )) {
310+ toProcess .add (d );
311+ backInQueue = true ;
312+ ++pushBackIterations ;
313+ break ;
314+ }
315+ }
316+ if (backInQueue ) {
317+ break ;
318+ }
319+ }
320+ if (!backInQueue ) {
321+ relativeScheduledToReturn .add (d );
322+ pushBackIterations = 0 ;
323+ }
324+ if (pushBackIterations > toProcess .size () + 1 ) {
325+ LOG .warning ("Cycle detected when restoring scheduled items with relative time" );
326+ // Cycle, stop and copy all remaining data in the list to return.
327+ // It should not happen.
328+ relativeScheduledToReturn .addAll (toProcess );
329+ break ;
330+ }
331+ }
332+ return relativeScheduledToReturn ;
333+ }
334+
335+ private void restoreActivities (List <ScheduledActivityData > scheduledItems , Instant now ) throws SchedulingException {
274336 for (ScheduledActivityData item : scheduledItems ) {
275337 if (item .getState () == SchedulingState .SCHEDULED ) {
276- // Create ScheduledTask
277- ScheduledTask st = new ScheduledTask (this , timer , dispatcher , item );
278- id2scheduledTask .put (st .getId (), st );
279- // Prepare execution event depending on trigger (absolute, relative, event)
280- st .armTrigger ();
338+ if (isToBeScheduled (item , now )) {
339+ LOG .info ("Restoring scheduled activity: " + item );
340+ // Create ScheduledTask
341+ ScheduledTask st = new ScheduledTask (this , timer , dispatcher , item );
342+ id2scheduledTask .put (st .getId (), st );
343+ // Prepare execution event depending on trigger (absolute, relative, event)
344+ st .armTrigger ();
345+ } else {
346+ LOG .warning ("Scheduled activity not restored (ABORTED): " + item );
347+ // ABORTED, it will never start
348+ storeAndDistribute (new ScheduledActivityData (item .getInternalId (),
349+ item .getGenerationTime (),
350+ item .getRequest (),
351+ item .getActivityOccurrence (),
352+ item .getResources (),
353+ item .getSource (),
354+ item .getExternalId (),
355+ item .getTrigger (),
356+ item .getLatestInvocationTime (),
357+ item .getStartTime (),
358+ item .getDuration (),
359+ item .getConflictStrategy (),
360+ SchedulingState .ABORTED ,
361+ item .getExtension ()));
362+ }
281363 } else if (item .getState () == SchedulingState .RUNNING ) {
364+ LOG .warning ("Scheduled running activity restored with status UNKNOWN: " + item );
282365 storeAndDistribute (new ScheduledActivityData (item .getInternalId (),
283366 item .getGenerationTime (),
284367 item .getRequest (),
@@ -294,6 +377,7 @@ private void restoreActivitiesFromList(List<ScheduledActivityData> scheduledItem
294377 SchedulingState .UNKNOWN ,
295378 item .getExtension ()));
296379 } else if (item .getState () == SchedulingState .WAITING ) {
380+ LOG .warning ("Scheduled waiting activity restored with status ABORTED: " + item );
297381 storeAndDistribute (new ScheduledActivityData (item .getInternalId (),
298382 item .getGenerationTime (),
299383 item .getRequest (),
@@ -312,6 +396,45 @@ private void restoreActivitiesFromList(List<ScheduledActivityData> scheduledItem
312396 }
313397 }
314398
399+ private boolean isToBeScheduled (ScheduledActivityData item , Instant now ) {
400+ if (this .configuration .isRunPastScheduledActivities ()) {
401+ return true ;
402+ } else if (item .getTrigger () instanceof EventBasedSchedulingTrigger ) {
403+ // Always schedule
404+ return true ;
405+ } else if (item .getTrigger () instanceof AbsoluteTimeSchedulingTrigger ) {
406+ Instant expectedStart = ((AbsoluteTimeSchedulingTrigger ) item .getTrigger ()).getReleaseTime ();
407+ return expectedStart .equals (now ) || expectedStart .isAfter (now );
408+ } else if (item .getTrigger () instanceof NowSchedulingTrigger ) {
409+ // NowSchedulingTrigger are inaccurate, they were supposed to start right away, so at this stage do not restore
410+ return false ;
411+ } else if (item .getTrigger () instanceof RelativeTimeSchedulingTrigger ) {
412+ RelativeTimeSchedulingTrigger trigger = (RelativeTimeSchedulingTrigger ) item .getTrigger ();
413+ // Check that predecessors are all in the schedule and with right state
414+ for (String extId : trigger .getPredecessors ()) {
415+ boolean found = false ;
416+ for (ScheduledTask st : id2scheduledTask .values ()) {
417+ if (Objects .equals (st .getCurrentData ().getExternalId (), extId )) {
418+ found = true ;
419+ // If the status is SCHEDULED --> OK, else return false
420+ if (st .getCurrentData ().getState () != SchedulingState .SCHEDULED ) {
421+ return false ;
422+ }
423+ break ;
424+ }
425+ }
426+ if (!found ) {
427+ // Well, in the past or non-existing --> do not schedule
428+ return false ;
429+ }
430+ }
431+ return true ;
432+ } else {
433+ // ???
434+ return false ;
435+ }
436+ }
437+
315438 @ Override
316439 public void subscribe (ISchedulerSubscriber subscriber ) {
317440 dispatcher .submit (() -> {
@@ -718,7 +841,7 @@ void removeTask(IUniqueId scheduledId) {
718841 // remove from the current internal set
719842 ScheduledTask st = id2scheduledTask .remove (scheduledId );
720843 if (st != null ) {
721- LOG .info (String .format ("Removing scheduled task %s (%d )" , st .getRequest ().getRequest ().getPath ().asString (), st .getId ().asLong ()));
844+ LOG .info (String .format ("Removing scheduled task %s (%s )" , st .getRequest ().getRequest ().getPath ().asString (), st .getRequest ().getExternalId ()));
722845 st .abortTask ();
723846 // Update or remove in the archive
724847 ScheduledActivityData sad = st .getCurrentData ();
0 commit comments