Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit 2da2dba

Browse files
committed
Optimisation in archive time range retrieval
1 parent f372cc7 commit 2da2dba

17 files changed

Lines changed: 433 additions & 30 deletions

File tree

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/archive/IDataItemArchive.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ public interface IDataItemArchive<T extends AbstractDataItem, K extends Abstract
6060
*/
6161
List<T> retrieve(T startItem, int numRecords, RetrievalDirection direction, K filter) throws ArchiveException;
6262

63+
/**
64+
* Retrieve data items, between the provided startItem and the provided endTime (included), and matching the provided filter.
65+
* The returned list is ordered according to generation time (ascending order if startItem < endTime, descending order
66+
* if startItem > endTime).
67+
*
68+
* @param startTime the start time used as reference for the retrieval
69+
* @param endTime the end time used as reference for the retrieval
70+
* @param filter the filter, it can be null
71+
* @return the list of retrieved items
72+
* @throws ArchiveException in case of I/O problems, SQL problems or any other problem preventing the retrieval operation to be completed successfully
73+
*/
74+
List<T> retrieve(Instant startTime, Instant endTime, K filter) throws ArchiveException;
75+
6376
/**
6477
* Retrieve the status of the data item matching the filter at the specified time. Not all data item archive services
6578
* support this operation. If the operation is not supported, the archive service is entitled to throw an {@link UnsupportedOperationException}.

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/common/IDataItemProvisionService.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -103,34 +103,5 @@ public interface IDataItemProvisionService<T extends IDataItemSubscriber<K>, R e
103103
* @throws ReatmetricException if a problem arises with the retrieval operation
104104
* @throws RemoteException in case of remoting problem
105105
*/
106-
default List<K> retrieve(Instant startTime, Instant endTime, R filter) throws ReatmetricException, RemoteException {
107-
if(startTime.isAfter(endTime)) {
108-
throw new IllegalArgumentException("Start time is after end time");
109-
}
110-
List<K> data = new LinkedList<>();
111-
boolean timeWindowCovered = false;
112-
113-
List<K> temp = retrieve(startTime, 100, RetrievalDirection.TO_FUTURE, filter);
114-
while(temp != null && !temp.isEmpty() && !timeWindowCovered) {
115-
// Add all items to the list that have gen. time < now
116-
for(K pd : temp) {
117-
if(pd.getGenerationTime().isBefore(endTime)) {
118-
data.add(pd);
119-
} else if(pd.getGenerationTime().equals(endTime)) {
120-
data.add(pd);
121-
// Stop the retrieval
122-
timeWindowCovered = true;
123-
break;
124-
} else {
125-
// Stop the retrieval
126-
timeWindowCovered = true;
127-
break;
128-
}
129-
}
130-
if(!timeWindowCovered) {
131-
temp = retrieve(temp.get(temp.size() - 1), 100, RetrievalDirection.TO_FUTURE, filter);
132-
}
133-
}
134-
return data;
135-
}
106+
List<K> retrieve(Instant startTime, Instant endTime, R filter) throws ReatmetricException, RemoteException;
136107
}

eu.dariolucia.reatmetric.core/src/main/java/eu/dariolucia/reatmetric/core/impl/AcknowledgedMessageBrokerImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ public List<AcknowledgedMessage> retrieve(AcknowledgedMessage startItem, int num
102102
}
103103
}
104104

105+
@Override
106+
public List<AcknowledgedMessage> retrieve(Instant startTime, Instant endTime, AcknowledgedMessageFilter filter) throws ReatmetricException, RemoteException {
107+
// Access the archive and query it
108+
if(archive != null) {
109+
return archive.retrieve(startTime, endTime, filter);
110+
} else {
111+
return Collections.emptyList();
112+
}
113+
}
114+
105115
// To be called by the object owning the instance
106116
void distribute(List<OperationalMessage> items) throws ReatmetricException {
107117
// Transform the list in messages to be acknowledged

eu.dariolucia.reatmetric.core/src/main/java/eu/dariolucia/reatmetric/core/impl/OperationalMessageBrokerImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ public List<OperationalMessage> retrieve(OperationalMessage startItem, int numRe
105105
}
106106
}
107107

108+
@Override
109+
public List<OperationalMessage> retrieve(Instant startTime, Instant endTime, OperationalMessageFilter filter) throws ReatmetricException, RemoteException {
110+
// Access the archive and query it
111+
if(archive != null) {
112+
return archive.retrieve(startTime, endTime, filter);
113+
} else {
114+
return Collections.emptyList();
115+
}
116+
}
117+
108118
private void distribute(List<OperationalMessage> items, boolean store) throws ReatmetricException {
109119
if(store && archive != null) {
110120
archive.store(items);

eu.dariolucia.reatmetric.core/src/main/java/eu/dariolucia/reatmetric/core/impl/RawDataBrokerImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,16 @@ public List<RawData> retrieve(RawData startItem, int numRecords, RetrievalDirect
101101
}
102102
}
103103

104+
@Override
105+
public List<RawData> retrieve(Instant startTime, Instant endTime, RawDataFilter filter) throws ReatmetricException, RemoteException {
106+
// Access the archive and query it
107+
if(archive != null) {
108+
return archive.retrieve(startTime, endTime, filter);
109+
} else {
110+
return Collections.emptyList();
111+
}
112+
}
113+
104114
@Override
105115
public void distribute(List<RawData> items, boolean store) throws ReatmetricException {
106116
if(store && archive != null) {

eu.dariolucia.reatmetric.core/src/main/java/eu/dariolucia/reatmetric/core/impl/managers/AbstractAccessManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public List<T> retrieve(T startItem, int numRecords, RetrievalDirection directio
109109
}
110110
}
111111

112+
public List<T> retrieve(Instant startTime, Instant endTime, K filter) throws ReatmetricException {
113+
if(archive != null) {
114+
return archive.retrieve(startTime, endTime, filter);
115+
} else {
116+
throw new ReatmetricException(getName() + " - Archive not available");
117+
}
118+
}
119+
112120
public void dispose() {
113121
for(AbstractAccessSubscriber<T, K, J> aas : this.subscribers.values()) {
114122
aas.terminate();

eu.dariolucia.reatmetric.driver.httpserver/src/main/resources/reatmetric.html

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,35 @@
246246
parameterMap = null;
247247
}
248248

249+
async function retrieveParameterHistory() {
250+
await unsubscribeFromEvents();
251+
await unsubscribeFromMessages();
252+
await unsubscribeFromParameters();
253+
254+
var divToAttach = document.getElementById('div1');
255+
divToAttach.textContent = '';
256+
257+
var filter = reatmetric.parameterFilter(null, null, null, null, null, null);
258+
259+
// Create table node
260+
var thTextArray = ['Path', 'Gen. Time', 'Raw Value', 'Eng. Value', 'Validity', 'Alarm State'];
261+
var thWidthArray = [20, 20, 20, 20, 10, 10];
262+
tableBodyVar = createTable(divToAttach, thTextArray, thWidthArray);
263+
264+
var sTime = Date.now() - 3600 * 1000 * 5;
265+
var eTime = Date.now();
266+
var currentParameters = await reatmetric.retrieveParameters(sTime,eTime,filter);
267+
268+
var tdTextArrayKeys = ['path', 'gentime', 'raw', 'eng', 'validity', 'alarm'];
269+
if(currentParameters != null) {
270+
console.log("Retrieved " + currentParameters.length + " parameters");
271+
for(var i = 0; i < currentParameters.length; ++i) {
272+
var message = currentParameters[i];
273+
addRow(tableBodyVar, message, tdTextArrayKeys);
274+
}
275+
}
276+
}
277+
249278
async function subscribeToMessages() {
250279
if(subMsgKey != null) {
251280
return;
@@ -384,6 +413,7 @@
384413
<button type="button" onclick="subscribeToMessages()">Subscribe to messages</button>
385414
<button type="button" onclick="unsubscribeFromMessages()">Unsubscribe from messages</button>
386415
<button type="button" onclick="retrieveMessageHistory()">Retrieve messages</button>
416+
<button type="button" onclick="retrieveParameterHistory()">Retrieve parameters</button>
387417
<hr/>
388418
<br/>
389419
<div id="div1" class="tableFixHead">

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/services/AbstractDataItemArchive.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ protected List<T> doRetrieve(Connection connection, Instant startTime, int numRe
298298

299299
protected abstract String buildRetrieveQuery(Instant startTime, int numRecords, RetrievalDirection direction, K filter);
300300

301+
protected abstract String buildRetrieveQuery(Instant startTime, Instant endTime, boolean ascending, K filter);
302+
301303
public synchronized List<T> retrieve(T startItem, int numRecords, RetrievalDirection direction, K filter) throws ArchiveException {
302304
if (LOG.isLoggable(Level.FINER)) {
303305
LOG.finer(this + " - retrieve(T,int,RetrievalDirection,K) called: startItem=" + startItem + ", numRecords=" + numRecords + ", direction=" + direction);
@@ -310,6 +312,51 @@ public synchronized List<T> retrieve(T startItem, int numRecords, RetrievalDirec
310312
}
311313
}
312314

315+
public synchronized List<T> retrieve(Instant startTime, Instant endTime, K filter) throws ArchiveException {
316+
if (LOG.isLoggable(Level.FINER)) {
317+
LOG.finer(this + " - retrieve(T,int,RetrievalDirection,K) called: startItem=" + startTime + ", endTime=" + endTime);
318+
}
319+
checkDisposed();
320+
try {
321+
return doRetrieve(retrieveConnection, startTime, endTime, filter);
322+
} catch (SQLException e) {
323+
throw new ArchiveException(e);
324+
}
325+
}
326+
327+
protected List<T> doRetrieve(Connection connection, Instant startTime, Instant endTime, K filter) throws SQLException {
328+
if (startTime.isBefore(MINIMUM_TIME)) {
329+
startTime = MINIMUM_TIME;
330+
} else if (startTime.isAfter(MAXIMUM_TIME)) {
331+
startTime = MAXIMUM_TIME;
332+
}
333+
if (endTime.isBefore(MINIMUM_TIME)) {
334+
endTime = MINIMUM_TIME;
335+
} else if (endTime.isAfter(MAXIMUM_TIME)) {
336+
endTime = MAXIMUM_TIME;
337+
}
338+
String finalQuery = buildRetrieveQuery(startTime, endTime, startTime.isBefore(endTime), filter);
339+
List<T> result = new LinkedList<>();
340+
try (Statement prepStmt = connection.createStatement()) {
341+
if (LOG.isLoggable(Level.FINER)) {
342+
LOG.finer(this + " - retrieve statement: " + finalQuery);
343+
}
344+
try (ResultSet rs = prepStmt.executeQuery(finalQuery)) {
345+
while (rs.next()) {
346+
try {
347+
T object = mapToItem(rs, filter);
348+
result.add(object);
349+
} catch (IOException | ClassNotFoundException e) {
350+
throw new SQLException(e);
351+
}
352+
}
353+
} finally {
354+
connection.commit();
355+
}
356+
}
357+
return result;
358+
}
359+
313360
protected List<T> doRetrieve(Connection connection, T startItem, int numRecords, RetrievalDirection direction, K filter) throws SQLException {
314361
// Use the startItem generationTime to retrieve all the items from that point in time: increase limit by 100
315362
List<T> largeSize = doRetrieve(connection, startItem.getGenerationTime(), startItem.getInternalId(), numRecords + LOOK_AHEAD_SPAN, direction, filter);
@@ -370,6 +417,16 @@ protected void addTimeInfo(StringBuilder query, Instant startTime, RetrievalDire
370417
}
371418
}
372419

420+
protected void addTimeRangeInfo(StringBuilder query, Instant startTime, Instant endTime, boolean ascending) {
421+
if (ascending) { // startTime < endTime
422+
query.append("GenerationTime >= '").append(toTimestamp(startTime).toString())
423+
.append("' AND GenerationTime <= '").append(toTimestamp(endTime).toString()).append("' ");
424+
} else { // endTime < startTime
425+
query.append("GenerationTime >= '").append(toTimestamp(endTime).toString())
426+
.append("' AND GenerationTime <= '").append(toTimestamp(startTime).toString()).append("' ");
427+
}
428+
}
429+
373430
public synchronized IUniqueId retrieveLastId() throws ArchiveException {
374431
if (LOG.isLoggable(Level.FINER)) {
375432
LOG.finer(this + " - retrieveLastId() called");
@@ -436,6 +493,8 @@ protected String getLastIdQuery(Class<? extends AbstractDataItem> type) throws U
436493
}
437494
}
438495

496+
497+
439498
public synchronized Instant retrieveLastGenerationTime() throws ArchiveException {
440499
return retrieveLastGenerationTime(getMainType());
441500
}

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/services/AcknowledgedMessageArchive.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,33 @@ protected String buildRetrieveQuery(Instant startTime, int numRecords, Retrieval
197197
return query.toString();
198198
}
199199

200+
@Override
201+
protected String buildRetrieveQuery(Instant startTime, Instant endTime, boolean ascending, AcknowledgedMessageFilter filter) {
202+
StringBuilder query = new StringBuilder("SELECT a.UniqueId, a.GenerationTime, a.State, a.UserName, a.AcknowledgementTime, a.AdditionalData, " +
203+
"b.UniqueId, b.GenerationTime, b.Id, b.Text, b.Source, b.Severity, b.LinkedEntityId, b.AdditionalData " +
204+
"FROM ACK_MESSAGE_TABLE as a JOIN OPERATIONAL_MESSAGE_TABLE as b " +
205+
"ON (a.MessageId = b.UniqueId) " +
206+
"WHERE a.");
207+
// add time info
208+
addTimeRangeInfo(query, startTime, endTime, ascending);
209+
// process filter
210+
if(filter != null && !filter.isClear()) {
211+
if(filter.getUserList() != null && !filter.getUserList().isEmpty()) {
212+
query.append("AND a.UserName IN (").append(toFilterListString(filter.getUserList(), o -> o, "'")).append(") ");
213+
}
214+
if(filter.getStateList() != null && !filter.getStateList().isEmpty()) {
215+
query.append("AND a.State IN (").append(toEnumFilterListString(filter.getStateList())).append(") ");
216+
}
217+
}
218+
// order by and limit
219+
if(ascending) {
220+
query.append("ORDER BY a.GenerationTime ASC, a.UniqueId ASC");
221+
} else {
222+
query.append("ORDER BY a.GenerationTime DESC, a.UniqueId DESC");
223+
}
224+
return query.toString();
225+
}
226+
200227
@Override
201228
protected String buildRetrieveQuery(Instant startTime, IUniqueId internalId, int numRecords, RetrievalDirection direction, AcknowledgedMessageFilter filter) {
202229
StringBuilder query = new StringBuilder("SELECT a.UniqueId, a.GenerationTime, a.State, a.UserName, a.AcknowledgementTime, a.AdditionalData, " +

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/services/ActivityOccurrenceDataArchive.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,53 @@ protected String buildRetrieveQuery(Instant startTime, int numRecords, Retrieval
394394
return query.toString();
395395
}
396396

397+
@Override
398+
protected String buildRetrieveQuery(Instant startTime, Instant endTime, boolean ascending, ActivityOccurrenceDataFilter filter) {
399+
StringBuilder query = new StringBuilder();
400+
query.append("SELECT ao.UniqueId,ao.GenerationTime,ao.ExternalId,ao.Name,ao.Path,ao.Type,ao.Route,ao.Source,ao.Arguments,ao.Properties,ao.AdditionalData," +
401+
"r.UniqueId,r.GenerationTime,r.Name,r.ExecutionTime,r.State,r.NextState,r.ReportStatus,r.Result,r.ActivityOccurrenceId,r.AdditionalData " +
402+
"FROM ACTIVITY_REPORT_DATA_TABLE AS r JOIN ");
403+
query.append("(SELECT UniqueId,GenerationTime,ExternalId,Name,Path,Type,Route,Source,Arguments,Properties,AdditionalData FROM ACTIVITY_OCCURRENCE_DATA_TABLE WHERE ");
404+
// add time info
405+
addTimeRangeInfo(query, startTime, endTime, ascending);
406+
// process filter
407+
if(filter != null && !filter.isClear()) {
408+
if(filter.getParentPath() != null) {
409+
query.append("AND Path LIKE '").append(filter.getParentPath().asString()).append("%' ");
410+
}
411+
if(filter.getActivityPathList() != null && !filter.getActivityPathList().isEmpty()) {
412+
query.append("AND Path IN (").append(toFilterListString(filter.getActivityPathList(), SystemEntityPath::asString, "'")).append(") ");
413+
}
414+
if(filter.getRouteList() != null && !filter.getRouteList().isEmpty()) {
415+
query.append("AND Route IN (").append(toFilterListString(filter.getRouteList(), o -> o, "'")).append(") ");
416+
}
417+
if(filter.getSourceList() != null && !filter.getSourceList().isEmpty()) {
418+
query.append("AND Source IN (").append(toFilterListString(filter.getSourceList(), o -> o, "'")).append(") ");
419+
}
420+
if(filter.getTypeList() != null && !filter.getTypeList().isEmpty()) {
421+
query.append("AND Type IN (").append(toFilterListString(filter.getTypeList(), o -> o, "'")).append(") ");
422+
}
423+
if(filter.getExternalIdList() != null && !filter.getExternalIdList().isEmpty()) {
424+
query.append("AND ExternalId IN (").append(toFilterListString(filter.getExternalIdList(), o -> o, null)).append(") ");
425+
}
426+
// For the activity occurrence state we use application post-filtering... for the time being
427+
}
428+
// order by and limit
429+
if(ascending) {
430+
query.append("ORDER BY GenerationTime ASC, UniqueId ASC ");
431+
} else {
432+
query.append("ORDER BY GenerationTime DESC, UniqueId DESC ");
433+
}
434+
query.append(") AS ao ON ao.UniqueId = r.ActivityOccurrenceId ");
435+
// order by and limit
436+
if(ascending) {
437+
query.append("ORDER BY ao.GenerationTime ASC, ao.UniqueId ASC, r.UniqueId ASC");
438+
} else {
439+
query.append("ORDER BY ao.GenerationTime DESC, ao.UniqueId DESC, r.UniqueId ASC");
440+
}
441+
return query.toString();
442+
}
443+
397444
@Override
398445
protected String buildRetrieveQuery(Instant startTime, IUniqueId internalId, int numRecords, RetrievalDirection direction, ActivityOccurrenceDataFilter filter) {
399446
StringBuilder query = new StringBuilder();

0 commit comments

Comments
 (0)