Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit 12430fc

Browse files
committed
Scheduler implementation (on going)
1 parent 947941e commit 12430fc

9 files changed

Lines changed: 430 additions & 215 deletions

File tree

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/scheduler/IScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ public interface IScheduler extends IScheduledActivityDataProvisionService {
6161
*/
6262
ScheduledActivityData update(IUniqueId originalId, SchedulingRequest newRequest, CreationConflictStrategy conflictStrategy) throws SchedulingException;
6363

64-
boolean remove(IUniqueId scheduledId) throws SchedulingException;
64+
void remove(IUniqueId scheduledId) throws SchedulingException;
6565

66-
boolean remove(ScheduledActivityDataFilter filter) throws SchedulingException;
66+
void remove(ScheduledActivityDataFilter filter) throws SchedulingException;
6767

6868
/**
6969
* This operation removes all the scheduled activities between startTime and endTime belonging to the provided scheduling source,

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/scheduler/ScheduledActivityData.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import eu.dariolucia.reatmetric.api.common.IUniqueId;
55
import eu.dariolucia.reatmetric.api.processing.input.ActivityRequest;
66

7+
import java.time.Duration;
78
import java.time.Instant;
89
import java.util.Collection;
910
import java.util.Set;
@@ -24,6 +25,9 @@ public final class ScheduledActivityData extends AbstractDataItem {
2425

2526
private final Instant latestInvocationTime;
2627

28+
private final Instant startTime;
29+
private final Instant endTime;
30+
2731
private final ConflictStrategy conflictStrategy;
2832

2933
private final SchedulingState state;
@@ -43,7 +47,7 @@ public final class ScheduledActivityData extends AbstractDataItem {
4347
* @param state
4448
* @param extension
4549
*/
46-
public ScheduledActivityData(IUniqueId internalId, Instant generationTime, ActivityRequest request, IUniqueId activityOccurrence, Collection<String> resources, String source, long externalId, AbstractSchedulingTrigger trigger, Instant latestInvocationTime, ConflictStrategy conflictStrategy, SchedulingState state, Object extension) {
50+
public ScheduledActivityData(IUniqueId internalId, Instant generationTime, ActivityRequest request, IUniqueId activityOccurrence, Collection<String> resources, String source, long externalId, AbstractSchedulingTrigger trigger, Instant latestInvocationTime, Instant startTime, Instant endTime, ConflictStrategy conflictStrategy, SchedulingState state, Object extension) {
4751
super(internalId, generationTime, extension);
4852
if(resources == null) {
4953
throw new NullPointerException("Resources cannot be null");
@@ -64,6 +68,8 @@ public ScheduledActivityData(IUniqueId internalId, Instant generationTime, Activ
6468
this.latestInvocationTime = latestInvocationTime;
6569
this.conflictStrategy = conflictStrategy;
6670
this.state = state;
71+
this.startTime = startTime;
72+
this.endTime = endTime;
6773
}
6874

6975
public ActivityRequest getRequest() {
@@ -101,4 +107,21 @@ public ConflictStrategy getConflictStrategy() {
101107
public SchedulingState getState() {
102108
return state;
103109
}
110+
111+
public Instant getStartTime() {
112+
return startTime;
113+
}
114+
115+
public Instant getEndTime() {
116+
return endTime;
117+
}
118+
119+
public Duration getExpectedDuration() {
120+
return Duration.between(startTime, endTime);
121+
}
122+
123+
public boolean overlapsWith(Instant otherStartTime, Instant otherEndTime) {
124+
return (startTime.isAfter(otherStartTime) && startTime.isBefore(otherEndTime)) ||
125+
(endTime.isAfter(otherStartTime) && endTime.isBefore(otherEndTime));
126+
}
104127
}

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/scheduler/SchedulingState.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,9 @@ public enum SchedulingState {
3131
/**
3232
* The activity occurrence was completed unsuccessfully (status is COMPLETED-!OK)
3333
*/
34-
FINISHED_FAIL
34+
FINISHED_FAIL,
35+
/**
36+
* The activity has been removed from the scheduler when it was in SCHEDULED state.
37+
*/
38+
REMOVED;
3539
}

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/scheduler/input/SchedulingRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ public final class SchedulingRequest extends AbstractInputDataItem {
2828
private final Duration expectedDuration;
2929

3030
public SchedulingRequest(ActivityRequest request, Set<String> resources, String source, long externalId, AbstractSchedulingTrigger trigger, Instant latestInvocationTime, ConflictStrategy conflictStrategy, Duration expectedDuration) {
31+
if(expectedDuration == null) {
32+
throw new NullPointerException("Expected duration must be provided");
33+
}
3134
this.request = request;
3235
this.resources = Set.copyOf(resources);
3336
this.source = source;

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/services/ScheduledActivityDataArchive.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616

1717
package eu.dariolucia.reatmetric.persist.services;
1818

19-
import eu.dariolucia.reatmetric.api.archive.exceptions.ArchiveException;
2019
import eu.dariolucia.reatmetric.api.common.AbstractDataItem;
2120
import eu.dariolucia.reatmetric.api.common.IUniqueId;
2221
import eu.dariolucia.reatmetric.api.common.LongUniqueId;
2322
import eu.dariolucia.reatmetric.api.common.RetrievalDirection;
24-
import eu.dariolucia.reatmetric.api.messages.*;
2523
import eu.dariolucia.reatmetric.api.model.SystemEntityPath;
2624
import eu.dariolucia.reatmetric.api.processing.input.ActivityRequest;
2725
import eu.dariolucia.reatmetric.api.scheduler.*;
@@ -39,10 +37,10 @@ public class ScheduledActivityDataArchive extends AbstractDataItemArchive<Schedu
3937
private static final Logger LOG = Logger.getLogger(ScheduledActivityDataArchive.class.getName());
4038

4139
private static final String STORE_STATEMENT = "MERGE INTO SCHEDULED_ACTIVITY_DATA_TABLE USING SYSIBM.SYSDUMMY1 ON UniqueId = ? " +
42-
"WHEN MATCHED THEN UPDATE SET GenerationTime = ?, ActivityRequest = ?, Path = ?, ActivityOccurrence = ?, Resources = ?, Source = ?, ExternalId = ?, Trigger = ?, LatestInvocationTime = ?, ConflictStrategy = ?, State = ?, AdditionalData = ? " +
43-
"WHEN NOT MATCHED THEN INSERT (UniqueId,GenerationTime,ActivityRequest,Path,ActivityOccurrence,Resources,Source,ExternalId,Trigger,LatestInvocationTime,ConflictStrategy,State,AdditionalData) VALUES (?,?,?, ?,?,?, ?,?,?, ?,?,?, ?)";
40+
"WHEN MATCHED THEN UPDATE SET GenerationTime = ?, ActivityRequest = ?, Path = ?, ActivityOccurrence = ?, Resources = ?, Source = ?, ExternalId = ?, Trigger = ?, LatestInvocationTime = ?, StartTime = ?, EndTime = ?, ConflictStrategy = ?, State = ?, AdditionalData = ? " +
41+
"WHEN NOT MATCHED THEN INSERT (UniqueId,GenerationTime,ActivityRequest,Path,ActivityOccurrence,Resources,Source,ExternalId,Trigger,LatestInvocationTime,StartTime,EndTime,ConflictStrategy,State,AdditionalData) VALUES (?,?,?, ?,?,?, ?,?,?, ?,?,?, ?,?,?)";
4442
private static final String LAST_ID_QUERY = "SELECT UniqueId FROM SCHEDULED_ACTIVITY_DATA_TABLE ORDER BY UniqueId DESC FETCH FIRST ROW ONLY";
45-
private static final String RETRIEVE_BY_ID_QUERY = "SELECT UniqueId,GenerationTime,ActivityRequest,Path,ActivityOccurrence,Resources,Source,ExternalId,Trigger,LatestInvocationTime,ConflictStrategy,State,AdditionalData " +
43+
private static final String RETRIEVE_BY_ID_QUERY = "SELECT UniqueId,GenerationTime,ActivityRequest,Path,ActivityOccurrence,Resources,Source,ExternalId,Trigger,LatestInvocationTime,StartTime,EndTime,ConflictStrategy,State,AdditionalData " +
4644
"FROM SCHEDULED_ACTIVITY_DATA_TABLE " +
4745
"WHERE UniqueId=?";
4846
private static final String LAST_GENERATION_TIME_QUERY = "SELECT MAX(GenerationTime) FROM SCHEDULED_ACTIVITY_DATA_TABLE";
@@ -72,40 +70,44 @@ protected void setItemPropertiesToStatement(PreparedStatement storeStatement, Sc
7270
} else {
7371
storeStatement.setNull(10, Types.BLOB);
7472
}
75-
storeStatement.setShort(11, (short) item.getConflictStrategy().ordinal());
76-
storeStatement.setShort(12, (short) item.getState().ordinal());
73+
storeStatement.setTimestamp(11, toTimestamp(item.getStartTime()));
74+
storeStatement.setTimestamp(12, toTimestamp(item.getEndTime()));
75+
storeStatement.setShort(13, (short) item.getConflictStrategy().ordinal());
76+
storeStatement.setShort(14, (short) item.getState().ordinal());
7777
Object extension = item.getExtension();
7878
if(extension == null) {
79-
storeStatement.setNull(13, Types.BLOB);
79+
storeStatement.setNull(15, Types.BLOB);
8080
} else {
81-
storeStatement.setBlob(13, toInputstream(item.getExtension()));
81+
storeStatement.setBlob(15, toInputstream(item.getExtension()));
8282
}
8383

8484

85-
storeStatement.setLong(14, item.getInternalId().asLong());
86-
storeStatement.setTimestamp(15, toTimestamp(item.getGenerationTime()));
87-
storeStatement.setBlob(16, toInputstream(item.getRequest()));
88-
storeStatement.setString(17, item.getRequest().getPath().asString());
85+
storeStatement.setLong(16, item.getInternalId().asLong());
86+
storeStatement.setTimestamp(17, toTimestamp(item.getGenerationTime()));
87+
storeStatement.setBlob(18, toInputstream(item.getRequest()));
88+
storeStatement.setString(19, item.getRequest().getPath().asString());
8989
if(item.getActivityOccurrence() != null) {
90-
storeStatement.setLong(18, item.getActivityOccurrence().asLong());
90+
storeStatement.setLong(20, item.getActivityOccurrence().asLong());
9191
} else {
92-
storeStatement.setNull(18, Types.BIGINT);
92+
storeStatement.setNull(20, Types.BIGINT);
9393
}
94-
storeStatement.setString(19, resources);
95-
storeStatement.setString(20, item.getSource());
96-
storeStatement.setLong(21, item.getExternalId());
97-
storeStatement.setBlob(22, toInputstream(item.getTrigger()));
94+
storeStatement.setString(21, resources);
95+
storeStatement.setString(22, item.getSource());
96+
storeStatement.setLong(23, item.getExternalId());
97+
storeStatement.setBlob(24, toInputstream(item.getTrigger()));
9898
if(item.getLatestInvocationTime() != null) {
99-
storeStatement.setTimestamp(23, toTimestamp(item.getLatestInvocationTime()));
99+
storeStatement.setTimestamp(25, toTimestamp(item.getLatestInvocationTime()));
100100
} else {
101-
storeStatement.setNull(23, Types.BLOB);
101+
storeStatement.setNull(25, Types.BLOB);
102102
}
103-
storeStatement.setShort(24, (short) item.getConflictStrategy().ordinal());
104-
storeStatement.setShort(25, (short) item.getState().ordinal());
103+
storeStatement.setTimestamp(26, toTimestamp(item.getStartTime()));
104+
storeStatement.setTimestamp(27, toTimestamp(item.getEndTime()));
105+
storeStatement.setShort(28, (short) item.getConflictStrategy().ordinal());
106+
storeStatement.setShort(29, (short) item.getState().ordinal());
105107
if(extension == null) {
106-
storeStatement.setNull(26, Types.BLOB);
108+
storeStatement.setNull(30, Types.BLOB);
107109
} else {
108-
storeStatement.setBlob(26, toInputstream(item.getExtension()));
110+
storeStatement.setBlob(30, toInputstream(item.getExtension()));
109111
}
110112
}
111113

@@ -204,15 +206,19 @@ protected ScheduledActivityData mapToItem(ResultSet rs, ScheduledActivityDataFil
204206
if(rs.wasNull()) {
205207
latestInvocTime = null;
206208
}
207-
ConflictStrategy conflictStrategy = ConflictStrategy.values()[rs.getShort(11)];
208-
SchedulingState state = SchedulingState.values()[rs.getShort(12)];
209-
Blob extensionBlob = rs.getBlob(13);
209+
Timestamp startTime = rs.getTimestamp(11);
210+
211+
Timestamp endTime = rs.getTimestamp(12);
212+
213+
ConflictStrategy conflictStrategy = ConflictStrategy.values()[rs.getShort(13)];
214+
SchedulingState state = SchedulingState.values()[rs.getShort(14)];
215+
Blob extensionBlob = rs.getBlob(15);
210216
Object extension = null;
211217
if(extensionBlob != null && !rs.wasNull()) {
212218
extension = toObject(extensionBlob);
213219
}
214220
return new ScheduledActivityData(new LongUniqueId(uniqueId), toInstant(genTime), request,
215-
actOcc == null ? null : new LongUniqueId(actOcc), resources, source, extId, trigger, toInstant(latestInvocTime), conflictStrategy, state, extension);
221+
actOcc == null ? null : new LongUniqueId(actOcc), resources, source, extId, trigger, toInstant(latestInvocTime), toInstant(startTime), toInstant(endTime), conflictStrategy, state, extension);
216222
}
217223

218224
private Set<String> parseResources(String string) {

eu.dariolucia.reatmetric.persist/src/main/resources/schema.ddl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ CREATE TABLE SCHEDULED_ACTIVITY_DATA_TABLE (
128128
ExternalId BIGINT NOT NULL,
129129
Trigger BLOB NOT NULL,
130130
LatestInvocationTime TIMESTAMP,
131+
StartTime TIMESTAMP NOT NULL,
132+
EndTime TIMESTAMP NOT NULL,
131133
ConflictStrategy SMALLINT NOT NULL,
132134
State SMALLINT NOT NULL,
133135
AdditionalData BLOB,

0 commit comments

Comments
 (0)