Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit f8920ab

Browse files
committed
Add archive support for acknowledgement of messages
1 parent 7944d3d commit f8920ab

8 files changed

Lines changed: 337 additions & 18 deletions

File tree

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/messages/AcknowledgedMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class AcknowledgedMessage extends AbstractDataItem implements Serializabl
3737

3838
private final String user;
3939

40-
public AcknowledgedMessage(IUniqueId internalId, Instant generationTime, Object extension, OperationalMessage message, AcknowledgementState state, Instant acknowledgementTime, String user) {
40+
public AcknowledgedMessage(IUniqueId internalId, Instant generationTime, OperationalMessage message, AcknowledgementState state, Instant acknowledgementTime, String user, Object extension) {
4141
super(internalId, generationTime, extension);
4242
this.message = message;
4343
this.state = state;
@@ -62,7 +62,7 @@ public String getUser() {
6262
}
6363

6464
public AcknowledgedMessage ack(String user) {
65-
return new AcknowledgedMessage(getInternalId(), getGenerationTime(), getExtension(), getMessage(), AcknowledgementState.ACKNOWLEDGED, Instant.now(), user);
65+
return new AcknowledgedMessage(getInternalId(), getGenerationTime(), getMessage(), AcknowledgementState.ACKNOWLEDGED, Instant.now(), user, getExtension());
6666
}
6767

6868
@Override

eu.dariolucia.reatmetric.core/src/main/java/eu/dariolucia/reatmetric/core/impl/AcknowledgedMessageBrokerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private List<AcknowledgedMessage> process(List<OperationalMessage> items) {
130130
for(OperationalMessage om : items) {
131131
if(toBeAcknowledged(om)) {
132132
IUniqueId idToAssign = nextOperationalMessageId();
133-
AcknowledgedMessage toAck = new AcknowledgedMessage(idToAssign, om.getGenerationTime(), null, om, AcknowledgementState.PENDING, null, null);
133+
AcknowledgedMessage toAck = new AcknowledgedMessage(idToAssign, om.getGenerationTime(), om, AcknowledgementState.PENDING, null, null, null);
134134
toReturn.add(toAck);
135135
}
136136
}

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/Archive.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import eu.dariolucia.reatmetric.api.common.AbstractDataItemFilter;
2626
import eu.dariolucia.reatmetric.api.common.DebugInformation;
2727
import eu.dariolucia.reatmetric.api.events.IEventDataArchive;
28+
import eu.dariolucia.reatmetric.api.messages.IAcknowledgedMessageArchive;
2829
import eu.dariolucia.reatmetric.api.messages.IOperationalMessageArchive;
2930
import eu.dariolucia.reatmetric.api.parameters.IParameterDataArchive;
3031
import eu.dariolucia.reatmetric.api.rawdata.IRawDataArchive;
@@ -83,6 +84,7 @@ private void initialiseArchiveServices() throws ArchiveException {
8384
registeredArchives.put(IParameterDataArchive.class, new ParameterDataArchive(this));
8485
registeredArchives.put(IAlarmParameterDataArchive.class, new AlarmParameterDataArchive(this));
8586
registeredArchives.put(IActivityOccurrenceDataArchive.class, new ActivityOccurrenceDataArchive(this));
87+
registeredArchives.put(IAcknowledgedMessageArchive.class, new AcknowledgedMessageArchive(this));
8688
} catch (SQLException e) {
8789
throw new ArchiveException(e);
8890
}

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/services/AbstractDataItemArchive.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -524,15 +524,15 @@ public synchronized void dispose() throws ArchiveException {
524524
* Conversion methods
525525
**************************************************************************************************/
526526

527-
protected Timestamp toTimestamp(Instant t) {
527+
protected static Timestamp toTimestamp(Instant t) {
528528
return t == null ? null : Timestamp.from(t);
529529
}
530530

531-
protected Instant toInstant(Timestamp genTime) {
531+
protected static Instant toInstant(Timestamp genTime) {
532532
return genTime == null ? null : genTime.toInstant();
533533
}
534534

535-
protected Object toObject(Blob b) throws IOException, SQLException {
535+
protected static Object toObject(Blob b) throws IOException, SQLException {
536536
Object toReturn = null;
537537
if(b != null) {
538538
InputStream ois = b.getBinaryStream();
@@ -541,22 +541,22 @@ protected Object toObject(Blob b) throws IOException, SQLException {
541541
return toReturn;
542542
}
543543

544-
protected InputStream toInputstream(Object data) {
544+
protected static InputStream toInputstream(Object data) {
545545
if(data == null) {
546546
return null;
547547
}
548548
return new ByteArrayInputStream(ValueUtil.serialize(data));
549549
}
550550

551-
protected byte[] toByteArray(InputStream is) throws IOException {
551+
protected static byte[] toByteArray(InputStream is) throws IOException {
552552
return is.readAllBytes();
553553
}
554554

555-
protected <E extends Enum<E>> String toEnumFilterListString(Set<E> enumList) {
555+
protected static <E extends Enum<E>> String toEnumFilterListString(Set<E> enumList) {
556556
return toFilterListString(enumList, Enum::ordinal, null);
557557
}
558558

559-
protected <E> String toFilterListString(Set<E> list, Function<E, Object> extractor, String delimiter) {
559+
protected static <E> String toFilterListString(Set<E> list, Function<E, Object> extractor, String delimiter) {
560560
StringBuilder sb = new StringBuilder();
561561
int i = 0;
562562
for(E o : list) {
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright (c) 2020 Dario Lucia (https://www.dariolucia.eu)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package eu.dariolucia.reatmetric.persist.services;
18+
19+
import eu.dariolucia.reatmetric.api.common.AbstractDataItem;
20+
import eu.dariolucia.reatmetric.api.common.IUniqueId;
21+
import eu.dariolucia.reatmetric.api.common.LongUniqueId;
22+
import eu.dariolucia.reatmetric.api.common.RetrievalDirection;
23+
import eu.dariolucia.reatmetric.api.messages.*;
24+
import eu.dariolucia.reatmetric.persist.Archive;
25+
26+
import java.io.IOException;
27+
import java.sql.*;
28+
import java.time.Instant;
29+
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.logging.Level;
32+
import java.util.logging.Logger;
33+
34+
public class AcknowledgedMessageArchive extends AbstractDataItemArchive<AcknowledgedMessage, AcknowledgedMessageFilter> implements IAcknowledgedMessageArchive {
35+
36+
private static final Logger LOG = Logger.getLogger(AcknowledgedMessageArchive.class.getName());
37+
38+
private static final String INSERT_STATEMENT = "INSERT INTO ACK_MESSAGE_TABLE(UniqueId,GenerationTime,MessageId,State,UserName,AcknowledgementTime,AdditionalData) VALUES (?,?,?,?,?,?,?)";
39+
private static final String UPDATE_STATEMENT = "UPDATE ACK_MESSAGE_TABLE SET GenerationTime = ?, MessageId = ?, State = ?, UserName = ?, AcknowledgementTime = ?, AdditionalData = ? WHERE UniqueId = ?";
40+
private static final String STORE_STATEMENT = "MERGE INTO ACK_MESSAGE_TABLE USING SYSIBM.SYSDUMMY1 ON UniqueId = ? " +
41+
"WHEN MATCHED THEN UPDATE SET GenerationTime = ?, MessageId = ?, State = ?, UserName = ?, AcknowledgementTime = ?, AdditionalData = ? " +
42+
"WHEN NOT MATCHED THEN INSERT (UniqueId,GenerationTime,MessageId,State,UserName,AcknowledgementTime,AdditionalData) VALUES (?,?,?,?,?,?,?)";
43+
private static final String LAST_ID_QUERY = "SELECT UniqueId FROM ACK_MESSAGE_TABLE ORDER BY UniqueId DESC FETCH FIRST ROW ONLY";
44+
private static final String RETRIEVE_BY_ID_QUERY = "SELECT a.UniqueId, a.GenerationTime, a.State, a.UserName, a.AcknowledgementTime, a.AdditionalData, " +
45+
"b.UniqueId, b.GenerationTime, b.Id, b.Text, b.Source, b.Severity, b.AdditionalData " +
46+
"FROM ACK_MESSAGE_TABLE as a JOIN OPERATIONAL_MESSAGE_TABLE as b " +
47+
"ON (a.MessageId = b.UniqueId) " +
48+
"WHERE UniqueId=?";
49+
private static final String LAST_GENERATION_TIME_QUERY = "SELECT MAX(GenerationTime) FROM ACK_MESSAGE_TABLE";
50+
51+
public AcknowledgedMessageArchive(Archive controller) throws SQLException {
52+
super(controller);
53+
}
54+
55+
@Override
56+
protected void setItemPropertiesToStatement(PreparedStatement storeStatement, AcknowledgedMessage item) throws SQLException {
57+
storeStatement.setLong(1, item.getInternalId().asLong());
58+
storeStatement.setTimestamp(2, toTimestamp(item.getGenerationTime()));
59+
storeStatement.setLong(3, item.getMessage().getInternalId().asLong());
60+
storeStatement.setShort(4, (short) item.getState().ordinal());
61+
if(item.getUser() == null) {
62+
storeStatement.setNull(5, Types.VARCHAR);
63+
} else {
64+
storeStatement.setString(5, item.getUser());
65+
}
66+
storeStatement.setTimestamp(6, toTimestamp(item.getAcknowledgementTime()));
67+
Object extension = item.getExtension();
68+
if(extension == null) {
69+
storeStatement.setNull(7, Types.BLOB);
70+
} else {
71+
storeStatement.setBlob(7, toInputstream(item.getExtension()));
72+
}
73+
74+
storeStatement.setLong(8, item.getInternalId().asLong());
75+
storeStatement.setTimestamp(9, toTimestamp(item.getGenerationTime()));
76+
storeStatement.setLong(10, item.getMessage().getInternalId().asLong());
77+
storeStatement.setShort(11, (short) item.getState().ordinal());
78+
if(item.getUser() == null) {
79+
storeStatement.setNull(12, Types.VARCHAR);
80+
} else {
81+
storeStatement.setString(12, item.getUser());
82+
}
83+
storeStatement.setTimestamp(13, toTimestamp(item.getAcknowledgementTime()));
84+
if(extension == null) {
85+
storeStatement.setNull(14, Types.BLOB);
86+
} else {
87+
storeStatement.setBlob(14, toInputstream(item.getExtension()));
88+
}
89+
}
90+
91+
@Override
92+
protected PreparedStatement createStoreStatement(Connection connection) throws SQLException {
93+
if(LOG.isLoggable(Level.FINEST)) {
94+
LOG.finest(this + " - preparing store statement: " + STORE_STATEMENT);
95+
}
96+
return connection.prepareStatement(STORE_STATEMENT);
97+
}
98+
99+
@Override
100+
protected String buildRetrieveByIdQuery() {
101+
return RETRIEVE_BY_ID_QUERY;
102+
}
103+
104+
@Override
105+
protected String buildRetrieveQuery(Instant startTime, int numRecords, RetrievalDirection direction, AcknowledgedMessageFilter filter) {
106+
StringBuilder query = new StringBuilder("SELECT a.UniqueId, a.GenerationTime, a.State, a.UserName, a.AcknowledgementTime, a.AdditionalData, " +
107+
"b.UniqueId, b.GenerationTime, b.Id, b.Text, b.Source, b.Severity, b.AdditionalData " +
108+
"FROM ACK_MESSAGE_TABLE as a JOIN OPERATIONAL_MESSAGE_TABLE as b " +
109+
"ON (a.MessageId = b.UniqueId) " +
110+
"WHERE a.");
111+
// add time info
112+
addTimeInfo(query, startTime, direction);
113+
// process filter
114+
if(filter != null && !filter.isClear()) {
115+
if(filter.getUserList() != null && !filter.getUserList().isEmpty()) {
116+
query.append("AND a.UserName IN (").append(toFilterListString(filter.getUserList(), o -> o, "'")).append(") ");
117+
}
118+
if(filter.getStateList() != null && !filter.getStateList().isEmpty()) {
119+
query.append("AND a.State IN (").append(toEnumFilterListString(filter.getStateList())).append(") ");
120+
}
121+
}
122+
// order by and limit
123+
if(direction == RetrievalDirection.TO_FUTURE) {
124+
query.append("ORDER BY a.GenerationTime ASC, a.UniqueId ASC FETCH NEXT ").append(numRecords).append(" ROWS ONLY");
125+
} else {
126+
query.append("ORDER BY a.GenerationTime DESC, a.UniqueId DESC FETCH NEXT ").append(numRecords).append(" ROWS ONLY");
127+
}
128+
return query.toString();
129+
}
130+
131+
@Override
132+
protected String buildRetrieveQuery(Instant startTime, IUniqueId internalId, int numRecords, RetrievalDirection direction, AcknowledgedMessageFilter filter) {
133+
StringBuilder query = new StringBuilder("SELECT a.UniqueId, a.GenerationTime, a.State, a.UserName, a.AcknowledgementTime, a.AdditionalData, " +
134+
"b.UniqueId, b.GenerationTime, b.Id, b.Text, b.Source, b.Severity, b.AdditionalData " +
135+
"FROM ACK_MESSAGE_TABLE as a JOIN OPERATIONAL_MESSAGE_TABLE as b " +
136+
"ON (a.MessageId = b.UniqueId) " +
137+
"WHERE a.");
138+
// add time info
139+
addTimeInfo(query, startTime, internalId, direction);
140+
// process filter
141+
if(filter != null && !filter.isClear()) {
142+
if(filter.getUserList() != null && !filter.getUserList().isEmpty()) {
143+
query.append("AND a.UserName IN (").append(toFilterListString(filter.getUserList(), o -> o, "'")).append(") ");
144+
}
145+
if(filter.getStateList() != null && !filter.getStateList().isEmpty()) {
146+
query.append("AND a.State IN (").append(toEnumFilterListString(filter.getStateList())).append(") ");
147+
}
148+
}
149+
// order by and limit
150+
if(direction == RetrievalDirection.TO_FUTURE) {
151+
query.append("ORDER BY a.GenerationTime ASC, a.UniqueId ASC FETCH NEXT ").append(numRecords).append(" ROWS ONLY");
152+
} else {
153+
query.append("ORDER BY a.GenerationTime DESC, a.UniqueId DESC FETCH NEXT ").append(numRecords).append(" ROWS ONLY");
154+
}
155+
return query.toString();
156+
}
157+
158+
@Override
159+
protected AcknowledgedMessage mapToItem(ResultSet rs, AcknowledgedMessageFilter usedFilder) throws SQLException, IOException {
160+
OperationalMessage om = OperationalMessageArchive.mapToItem(rs, null, 6);
161+
long uniqueId = rs.getLong(1);
162+
Timestamp genTime = rs.getTimestamp(2);
163+
AcknowledgementState state = AcknowledgementState.values()[rs.getShort(3)];
164+
String user = rs.getString(4);
165+
Timestamp ackTime = rs.getTimestamp(5);
166+
Blob extensionBlob = rs.getBlob(6);
167+
Object extension = null;
168+
if(extensionBlob != null && !rs.wasNull()) {
169+
extension = toObject(extensionBlob);
170+
}
171+
return new AcknowledgedMessage(new LongUniqueId(uniqueId), toInstant(genTime), om, state, toInstant(ackTime), user, extension);
172+
}
173+
174+
@Override
175+
protected String getLastIdQuery() {
176+
return LAST_ID_QUERY;
177+
}
178+
179+
@Override
180+
protected String getLastGenerationTimeQuery(Class<? extends AbstractDataItem> type) {
181+
return LAST_GENERATION_TIME_QUERY;
182+
}
183+
184+
@Override
185+
protected List<String> getPurgeQuery(Instant referenceTime, RetrievalDirection direction) {
186+
return Arrays.asList(
187+
"DELETE FROM ACK_MESSAGE_TABLE WHERE GenerationTime " + (direction == RetrievalDirection.TO_FUTURE ? ">" : "<") + "'" + toTimestamp(referenceTime) + "'"
188+
);
189+
}
190+
191+
@Override
192+
public String toString() {
193+
return "Acknowledgement Message Archive";
194+
}
195+
196+
@Override
197+
protected Class<AcknowledgedMessage> getMainType() {
198+
return AcknowledgedMessage.class;
199+
}
200+
}

eu.dariolucia.reatmetric.persist/src/main/java/eu/dariolucia/reatmetric/persist/services/OperationalMessageArchive.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,18 @@ protected String buildRetrieveQuery(Instant startTime, IUniqueId internalId, int
135135
}
136136

137137
@Override
138-
protected OperationalMessage mapToItem(ResultSet rs, OperationalMessageFilter usedFilder) throws SQLException, IOException {
139-
long uniqueId = rs.getLong(1);
140-
Timestamp genTime = rs.getTimestamp(2);
141-
String messageId = rs.getString(3);
142-
String messageText = rs.getString(4);
143-
String messageSource = rs.getString(5);
144-
Severity severity = Severity.values()[rs.getShort(6)];
145-
Blob extensionBlob = rs.getBlob(7);
138+
protected OperationalMessage mapToItem(ResultSet rs, OperationalMessageFilter usedFilter) throws SQLException, IOException {
139+
return mapToItem(rs, usedFilter, 0);
140+
}
141+
142+
static OperationalMessage mapToItem(ResultSet rs, OperationalMessageFilter usedFilter, int offset) throws SQLException, IOException {
143+
long uniqueId = rs.getLong(offset + 1);
144+
Timestamp genTime = rs.getTimestamp(offset + 2);
145+
String messageId = rs.getString(offset + 3);
146+
String messageText = rs.getString(offset + 4);
147+
String messageSource = rs.getString(offset + 5);
148+
Severity severity = Severity.values()[rs.getShort(offset + 6)];
149+
Blob extensionBlob = rs.getBlob(offset + 7);
146150
Object extension = null;
147151
if(extensionBlob != null && !rs.wasNull()) {
148152
extension = toObject(extensionBlob);

eu.dariolucia.reatmetric.persist/src/main/resources/schema.ddl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,17 @@ CREATE TABLE OPERATIONAL_MESSAGE_TABLE (
99
PRIMARY KEY (UniqueId)
1010
)
1111
-- SEPARATOR
12+
CREATE TABLE ACK_MESSAGE_TABLE (
13+
UniqueId BIGINT NOT NULL,
14+
GenerationTime TIMESTAMP NOT NULL,
15+
MessageId BIGINT NOT NULL,
16+
State SMALLINT NOT NULL,
17+
UserName VARCHAR(32),
18+
AcknowledgementTime TIMESTAMP,
19+
AdditionalData BLOB,
20+
PRIMARY KEY (UniqueId)
21+
)
22+
-- SEPARATOR
1223
CREATE TABLE EVENT_DATA_TABLE (
1324
UniqueId BIGINT NOT NULL,
1425
GenerationTime TIMESTAMP NOT NULL,

0 commit comments

Comments
 (0)