Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit e5d9475

Browse files
committed
Implemented autoconnect for transport connectors
1 parent c7cf673 commit e5d9475

14 files changed

Lines changed: 334 additions & 20 deletions

File tree

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/transport/AbstractTransportConnector.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ abstract public class AbstractTransportConnector implements ITransportConnector
5050
private volatile boolean prepared = false;
5151
private volatile boolean initialised = false;
5252
private volatile boolean busy = false;
53+
private volatile boolean autoReconnect = false;
5354

5455
protected AbstractTransportConnector(String name, String description) {
5556
this();
@@ -128,6 +129,20 @@ public boolean isInitialised() {
128129
return initialised;
129130
}
130131

132+
@Override
133+
public void setReconnect(boolean autoReconnect) {
134+
boolean toNotify = !Objects.equals(autoReconnect, this.autoReconnect);
135+
this.autoReconnect = autoReconnect;
136+
if(toNotify) {
137+
notifySubscribers();
138+
}
139+
}
140+
141+
@Override
142+
public boolean isReconnect() {
143+
return this.autoReconnect;
144+
}
145+
131146
@Override
132147
public void initialise(Map<String, Object> properties) {
133148
checkPrepared();
@@ -216,7 +231,7 @@ public void dispose() {
216231
protected abstract void doDispose();
217232

218233
private void notifySubscribers() {
219-
TransportStatus status = new TransportStatus(name, connectionStatus, lastTxRate, lastRxRate, lastAlarmState);
234+
TransportStatus status = new TransportStatus(name, connectionStatus, lastTxRate, lastRxRate, lastAlarmState, autoReconnect);
220235
this.subscribers.forEach((s) -> {
221236
try {
222237
if(LOG.isLoggable(Level.FINER)) {

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/transport/ITransportConnector.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,35 +40,41 @@ public interface ITransportConnector extends Remote {
4040
* Return the name of the connector.
4141
*
4242
* @return the name of the connector
43+
* @throws RemoteException in case of issues on the remote side
4344
*/
4445
String getName() throws RemoteException;
4546

4647
/**
4748
* Return the description of the connector.
4849
*
4950
* @return the description of the connector
51+
* @throws RemoteException in case of issues on the remote side
5052
*/
5153
String getDescription() throws RemoteException;
5254

5355
/**
5456
* Return the current connection status.
5557
*
5658
* @return the current connection status
59+
* @throws RemoteException in case of issues on the remote side
5760
*/
5861
TransportConnectionStatus getConnectionStatus() throws RemoteException;
5962

6063
/**
6164
* Ask the connector to finalise its construction. This method shall be called before calling any of the other
62-
* {@link ITransportConnector} methods. Failing in doing so may result in undefined behaviour.
65+
* {@link ITransportConnector} methods. As such, it is typically invoked by the driver responsible for the creation
66+
* of the connector. Failing to invoke this method may result in undefined behaviour.
6367
*
6468
* Not supposed to be remoted.
69+
* @throws RemoteException in case of issues on the remote side
6570
*/
6671
void prepare() throws RemoteException;
6772

6873
/**
6974
* Return the initialisation status.
7075
*
7176
* @return true if the connector is initialised, otherwise false
77+
* @throws RemoteException in case of issues on the remote side
7278
*/
7379
boolean isInitialised() throws RemoteException;
7480

@@ -77,27 +83,31 @@ public interface ITransportConnector extends Remote {
7783
*
7884
* @param properties the properties that this connector shall use
7985
* @throws TransportException in case of problems
86+
* @throws RemoteException in case of issues on the remote side
8087
*/
8188
void initialise(Map<String, Object> properties) throws TransportException, RemoteException;
8289

8390
/**
8491
* Ask the connector to perform the connection to the external system.
8592
*
8693
* @throws TransportException in case of problems
94+
* @throws RemoteException in case of issues on the remote side
8795
*/
8896
void connect() throws TransportException, RemoteException;
8997

9098
/**
9199
* Ask the connector to disconnect from the external system.
92100
*
93101
* @throws TransportException in case of problems
102+
* @throws RemoteException in case of issues on the remote side
94103
*/
95104
void disconnect() throws TransportException, RemoteException;
96105

97106
/**
98107
* Ask the connector to abort the connection to the external system.
99108
*
100109
* @throws TransportException in case of problems
110+
* @throws RemoteException in case of issues on the remote side
101111
*/
102112
void abort() throws TransportException, RemoteException;
103113

@@ -106,6 +116,7 @@ public interface ITransportConnector extends Remote {
106116
* implementation after calling this method may result in undefined behaviour.
107117
*
108118
* Not supposed to be remoted.
119+
* @throws RemoteException in case of issues on the remote side
109120
*/
110121
void dispose() throws RemoteException;
111122

@@ -115,27 +126,46 @@ public interface ITransportConnector extends Remote {
115126
* required value type.
116127
*
117128
* @return the initialisation map
129+
* @throws RemoteException in case of issues on the remote side
118130
*/
119131
Map<String, Pair<String, ValueTypeEnum>> getSupportedProperties() throws RemoteException;
120132

121133
/**
122134
* Return the current initialisation property values.
123135
*
124136
* @return the current initialisation property values
137+
* @throws RemoteException in case of issues on the remote side
125138
*/
126139
Map<String, Object> getCurrentProperties() throws RemoteException;
127140

128141
/**
129142
* Register a subscriber to this transport connector.
130143
*
131144
* @param listener the subscriber to register
145+
* @throws RemoteException in case of issues on the remote side
132146
*/
133147
void register(ITransportSubscriber listener) throws RemoteException;
134148

135149
/**
136150
* Deregister a subscriber from this transport connector.
137151
*
138152
* @param listener the subscriber to deregister
153+
* @throws RemoteException in case of issues on the remote side
139154
*/
140155
void deregister(ITransportSubscriber listener) throws RemoteException;
156+
157+
/**
158+
* Mark this transport connector to enable auto-reconnection when the connection is lost.
159+
* @param reconnect true to enable reconnection, otherwise false
160+
* @throws RemoteException in case of issues on the remote side
161+
*/
162+
void setReconnect(boolean reconnect) throws RemoteException;
163+
164+
/**
165+
* Give back the value of the reconnect flag.
166+
*
167+
* @return the reconnect flag value
168+
* @throws RemoteException in case of issues on the remote side
169+
*/
170+
boolean isReconnect() throws RemoteException;
141171
}

eu.dariolucia.reatmetric.api/src/main/java/eu/dariolucia/reatmetric/api/transport/TransportStatus.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ public final class TransportStatus implements Serializable {
3030
private final AlarmState alarmState;
3131
private final long txRate;
3232
private final long rxRate;
33+
private final boolean autoReconnect;
3334

34-
public TransportStatus(String name, TransportConnectionStatus status, long txRate, long rxRate, AlarmState alarmState) {
35+
public TransportStatus(String name, TransportConnectionStatus status, long txRate, long rxRate, AlarmState alarmState, boolean autoReconnect) {
3536
this.name = name;
3637
this.status = status;
3738
this.txRate = txRate;
3839
this.rxRate = rxRate;
3940
this.alarmState = alarmState;
41+
this.autoReconnect = autoReconnect;
4042
}
4143

4244
/**
@@ -85,6 +87,15 @@ public AlarmState getAlarmState() {
8587
return alarmState;
8688
}
8789

90+
/**
91+
* The auto-reconnection of the transport status.
92+
*
93+
* @return the auto-reconnection flag linked to this connector
94+
*/
95+
public boolean isAutoReconnect() {
96+
return autoReconnect;
97+
}
98+
8899
@Override
89100
public String toString() {
90101
return "TransportStatus{" +
@@ -93,6 +104,7 @@ public String toString() {
93104
", txRate=" + txRate +
94105
", rxRate=" + rxRate +
95106
", alarmState=" + alarmState +
107+
", autoReconnect=" + autoReconnect +
96108
'}';
97109
}
98110
}

eu.dariolucia.reatmetric.core/src/main/java/eu/dariolucia/reatmetric/core/ReatmetricSystemImpl.java

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
import eu.dariolucia.reatmetric.api.scheduler.IScheduler;
4141
import eu.dariolucia.reatmetric.api.scheduler.ISchedulerFactory;
4242
import eu.dariolucia.reatmetric.api.transport.ITransportConnector;
43+
import eu.dariolucia.reatmetric.api.transport.ITransportSubscriber;
44+
import eu.dariolucia.reatmetric.api.transport.TransportConnectionStatus;
45+
import eu.dariolucia.reatmetric.api.transport.TransportStatus;
46+
import eu.dariolucia.reatmetric.api.transport.exceptions.TransportException;
4347
import eu.dariolucia.reatmetric.core.api.*;
4448
import eu.dariolucia.reatmetric.core.api.exceptions.DriverException;
4549
import eu.dariolucia.reatmetric.core.configuration.DriverConfiguration;
@@ -60,13 +64,14 @@
6064
/**
6165
* Reference ReatMetric system implementation.
6266
*/
63-
public class ReatmetricSystemImpl implements IReatmetricSystem, IServiceCoreContext, IDriverListener {
67+
public class ReatmetricSystemImpl implements IReatmetricSystem, IServiceCoreContext, IDriverListener, ITransportSubscriber {
6468

6569
private static final Logger LOG = Logger.getLogger(ReatmetricSystemImpl.class.getName());
6670

6771
private final List<IDriver> drivers = new ArrayList<>();
6872
private final List<ITransportConnector> transportConnectors = new ArrayList<>();
6973
private final List<ITransportConnector> transportConnectorsImmutable = Collections.unmodifiableList(transportConnectors);
74+
7075
private final Map<Pair<String, String>, IRawDataRenderer> renderers = new HashMap<>();
7176
private final ServiceCoreConfiguration configuration;
7277

@@ -81,6 +86,8 @@ public class ReatmetricSystemImpl implements IReatmetricSystem, IServiceCoreCont
8186

8287
private volatile boolean initialised = false;
8388

89+
private final Timer executor = new Timer("ReatMetric Core - Executor", true);
90+
8491
public ReatmetricSystemImpl(String initString) throws ReatmetricException {
8592
try {
8693
configuration = ServiceCoreConfiguration.load(new FileInputStream(initString));
@@ -159,13 +166,55 @@ public void initialise(Consumer<SystemStatus> consumer) throws ReatmetricExcepti
159166
// Ignore for this method
160167
}
161168
}
169+
// Check the connection status of the connectors (autoconnect, reconnect)
170+
initialiseConnectorsProperties();
162171
// Derive system status
163172
deriveSystemStatus();
164173
initialised = true;
165174
// Done and ready to go
166175
LOG.info("Reatmetric Core System loaded with status " + systemStatus);
167176
}
168177

178+
private void initialiseConnectorsProperties() {
179+
List<ITransportConnector> toBeStartedNow = new LinkedList<>();
180+
for(ITransportConnector connector : transportConnectors) {
181+
// Reconnection flag
182+
boolean isReconnectExcluded = configuration.getAutostartConnectors().getReconnectExclusions().stream().anyMatch(o -> {
183+
try {
184+
return connector.getName().matches(o);
185+
} catch (RemoteException e) {
186+
LOG.log(Level.WARNING, "Cannot retrieve name of transport connector: " + e.getMessage(), e);
187+
return true;
188+
}
189+
});
190+
try {
191+
connector.setReconnect(configuration.getAutostartConnectors().isReconnect() && !isReconnectExcluded);
192+
} catch (RemoteException e) {
193+
LOG.log(Level.WARNING, "Cannot set reconnection status of transport connector: " + e.getMessage(), e);
194+
}
195+
boolean isAutostartExcluded = configuration.getAutostartConnectors().getStartupExclusions().stream().anyMatch(o -> {
196+
try {
197+
return connector.getName().matches(o);
198+
} catch (RemoteException e) {
199+
LOG.log(Level.WARNING, "Cannot retrieve name of transport connector: " + e.getMessage(), e);
200+
return true;
201+
}
202+
});
203+
if(configuration.getAutostartConnectors().isStartup() && !isAutostartExcluded) {
204+
toBeStartedNow.add(connector);
205+
}
206+
}
207+
// Start now!
208+
for(ITransportConnector connector : toBeStartedNow) {
209+
try {
210+
connector.initialise(connector.getCurrentProperties());
211+
connector.connect();
212+
} catch (Exception e) { // Protect the startup sequence!
213+
LOG.log(Level.WARNING, "Cannot start transport connector: " + e.getMessage(), e);
214+
}
215+
}
216+
}
217+
169218
@Override
170219
public SystemStatus getStatus() {
171220
return systemStatus;
@@ -198,6 +247,13 @@ private void deriveSystemStatus() {
198247
@Override
199248
public void dispose() throws ReatmetricException {
200249
initialised = false;
250+
for(ITransportConnector tc : transportConnectors) {
251+
try {
252+
tc.deregister(this);
253+
} catch (RemoteException e) {
254+
LOG.log(Level.WARNING, "Cannot deregister from transport connector: " + e.getMessage(), e);
255+
}
256+
}
201257
for(IDriver d : drivers) {
202258
d.dispose();
203259
}
@@ -239,6 +295,13 @@ private void registerActivityHandlers(String driverName, List<IActivityHandler>
239295

240296
private void registerConnectors(List<ITransportConnector> transportConnectors) {
241297
this.transportConnectors.addAll(transportConnectors);
298+
transportConnectors.forEach(o -> {
299+
try {
300+
o.register(this);
301+
} catch (RemoteException e) {
302+
LOG.log(Level.WARNING, "Cannot register to transport connector: " + e.getMessage(), e);
303+
}
304+
});
242305
}
243306

244307
private IDriver loadDriver(DriverConfiguration dc) throws DriverException {
@@ -285,7 +348,7 @@ public IOperationalMessageProvisionService getOperationalMessageMonitorService()
285348
}
286349

287350
@Override
288-
public IOperationalMessageCollectorService getOperationalMessageCollectorService() throws ReatmetricException, RemoteException {
351+
public IOperationalMessageCollectorService getOperationalMessageCollectorService() {
289352
return messageBroker.getCollectorService();
290353
}
291354

@@ -404,4 +467,31 @@ public List<DebugInformation> currentDebugInfo() {
404467
//
405468
return toReturn;
406469
}
470+
471+
@Override
472+
public void status(TransportStatus status) throws RemoteException {
473+
if(status.isAutoReconnect() &&
474+
(status.getStatus() == TransportConnectionStatus.ERROR ||
475+
status.getStatus() == TransportConnectionStatus.ABORTED ||
476+
status.getStatus() == TransportConnectionStatus.IDLE)) {
477+
// Look for the connector
478+
for(ITransportConnector c : transportConnectors) {
479+
if(c.getName().equals(status.getName())) {
480+
executor.schedule(new TimerTask() {
481+
@Override
482+
public void run() {
483+
try {
484+
if(c.getConnectionStatus() != TransportConnectionStatus.OPEN && c.getConnectionStatus() != TransportConnectionStatus.CONNECTING) {
485+
c.connect();
486+
}
487+
} catch (RemoteException | TransportException e) {
488+
LOG.log(Level.WARNING, "Cannot reconnect transport connector: " + e.getMessage(), e);
489+
}
490+
}
491+
}, 500);
492+
return;
493+
}
494+
}
495+
}
496+
}
407497
}

0 commit comments

Comments
 (0)