Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit 346f595

Browse files
committed
Test fixes; initial implementation of raw TCP CLTU/CADU connector
1 parent ea4028a commit 346f595

6 files changed

Lines changed: 246 additions & 3 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Copyright (c) 2023 Dario Lucia (https://www.dariolucia.eu)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package eu.dariolucia.reatmetric.driver.spacecraft.connectors;
19+
20+
import eu.dariolucia.reatmetric.api.common.Pair;
21+
import eu.dariolucia.reatmetric.api.transport.AbstractTransportConnector;
22+
import eu.dariolucia.reatmetric.api.transport.exceptions.TransportException;
23+
import eu.dariolucia.reatmetric.api.value.StringUtil;
24+
import eu.dariolucia.reatmetric.core.api.IServiceCoreContext;
25+
import eu.dariolucia.reatmetric.driver.spacecraft.activity.ForwardDataUnitProcessingStatus;
26+
import eu.dariolucia.reatmetric.driver.spacecraft.activity.IForwardDataUnitStatusSubscriber;
27+
import eu.dariolucia.reatmetric.driver.spacecraft.activity.cltu.ICltuConnector;
28+
import eu.dariolucia.reatmetric.driver.spacecraft.definition.SpacecraftConfiguration;
29+
30+
import java.io.IOException;
31+
import java.io.InputStream;
32+
import java.net.Socket;
33+
import java.rmi.RemoteException;
34+
import java.time.Instant;
35+
import java.util.Collections;
36+
import java.util.List;
37+
import java.util.concurrent.CopyOnWriteArrayList;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
40+
import java.util.logging.Level;
41+
import java.util.logging.Logger;
42+
43+
/**
44+
* This connector established a TCP/IP connection to a host on a given port, and uses this connection to:
45+
* <ul>
46+
* <li>Receive CADUs</li>
47+
* <li>Send CLTUs</li>
48+
* </ul>
49+
*/
50+
public class CltuCaduTcpConnector extends AbstractTransportConnector implements ICltuConnector {
51+
private static final Logger LOG = Logger.getLogger(CltuCaduTcpConnector.class.getName());
52+
53+
private volatile Instant lastSamplingTime;
54+
private volatile long rxBytes; // injected bytes
55+
private volatile long txBytes; // injected bytes
56+
private String driverName;
57+
private SpacecraftConfiguration spacecraftConfig;
58+
private IServiceCoreContext context;
59+
private String host;
60+
private int port;
61+
private int caduLength;
62+
private int asmLength;
63+
64+
private final List<IForwardDataUnitStatusSubscriber> subscribers = new CopyOnWriteArrayList<>();
65+
private volatile Socket socket;
66+
67+
private final ExecutorService cltuForwarderExecutor = Executors.newSingleThreadExecutor(r -> {
68+
Thread t = new Thread(r, "CLTU/CADU - CLTU forward thread");
69+
t.setDaemon(true);
70+
return t;
71+
});
72+
73+
private Thread readingTmThread = null;
74+
75+
public CltuCaduTcpConnector() {
76+
super("CLTU/CADU Connector", "CLTU/CADU Connector");
77+
}
78+
79+
@Override
80+
protected Pair<Long, Long> computeBitrate() {
81+
Instant now = Instant.now();
82+
if(lastSamplingTime != null) {
83+
long theRxBytes = rxBytes; // Not atomic, but ... who cares
84+
long theTxBytes = txBytes; // Not atomic, but ... who cares
85+
rxBytes = 0;
86+
txBytes = 0;
87+
long msDiff = now.toEpochMilli() - lastSamplingTime.toEpochMilli();
88+
long rxRate = Math.round((theRxBytes * 8000.0) / (msDiff));
89+
long txRate = Math.round((theTxBytes * 8000.0) / (msDiff));
90+
lastSamplingTime = now;
91+
return Pair.of(txRate, rxRate);
92+
} else {
93+
lastSamplingTime = now;
94+
return null;
95+
}
96+
}
97+
98+
@Override
99+
protected synchronized void doConnect() throws TransportException {
100+
if(socket == null) {
101+
try {
102+
// Connect, start thread to read CADUs and send RawData to broker for processing (with TM frames)
103+
socket = new Socket(this.host, this.port);
104+
final InputStream dataStream = socket.getInputStream();
105+
readingTmThread = new Thread(() -> {
106+
readTm(dataStream);
107+
}, "CLTU/CADU - CADU reading thread");
108+
readingTmThread.setDaemon(true);
109+
readingTmThread.start();
110+
} catch (IOException e) {
111+
throw new TransportException(e);
112+
}
113+
}
114+
// Do nothing
115+
}
116+
117+
private void readTm(InputStream inputStream) {
118+
while(this.socket != null) {
119+
try {
120+
byte[] cadu = inputStream.readNBytes(this.caduLength);
121+
// TODO process CADU
122+
// Remove ASM
123+
124+
// if randomisation == ON, derandomise
125+
126+
// get first frame-length bytes
127+
128+
// build transfer frame info with configuration from spacecraft
129+
130+
// distribute to raw data broker
131+
} catch (IOException e) {
132+
LOG.log(Level.SEVERE, "Reading of CADU failed: connection error on read", e);
133+
}
134+
}
135+
}
136+
137+
@Override
138+
protected synchronized void doDisconnect() throws TransportException {
139+
if(socket != null) {
140+
try {
141+
socket.close();
142+
} catch (IOException e) {
143+
// Ignore
144+
}
145+
socket = null;
146+
}
147+
if(readingTmThread != null) {
148+
readingTmThread = null;
149+
}
150+
// Done
151+
}
152+
153+
@Override
154+
protected synchronized void doDispose() {
155+
try {
156+
disconnect();
157+
} catch (TransportException e) {
158+
// Do nothing
159+
}
160+
cltuForwarderExecutor.shutdownNow();
161+
}
162+
163+
@Override
164+
public void abort() throws TransportException, RemoteException {
165+
disconnect();
166+
}
167+
168+
@Override
169+
public void configure(String driverName, SpacecraftConfiguration configuration, IServiceCoreContext context, String connectorInformation) throws RemoteException {
170+
this.driverName = driverName;
171+
this.spacecraftConfig = configuration;
172+
this.context = context;
173+
String[] infoSpl = connectorInformation.split(":", -1); // String with format: "<ip>:<port>:<CADU length>:<asm length>"
174+
this.host = infoSpl[0];
175+
this.port = Integer.parseInt(infoSpl[1]);
176+
this.caduLength = Integer.parseInt(infoSpl[2]);
177+
this.asmLength = Integer.parseInt(infoSpl[3]);
178+
}
179+
180+
@Override
181+
public void register(IForwardDataUnitStatusSubscriber subscriber) throws RemoteException {
182+
this.subscribers.add(subscriber);
183+
}
184+
185+
@Override
186+
public void deregister(IForwardDataUnitStatusSubscriber subscriber) throws RemoteException {
187+
this.subscribers.remove(subscriber);
188+
}
189+
190+
@Override
191+
public synchronized void sendCltu(byte[] cltu, long externalId) throws RemoteException {
192+
if(this.cltuForwarderExecutor.isShutdown()) {
193+
LOG.severe(String.format("Transmission of CLTU with external ID %d failed: CLTU/CADU connector disposed", externalId));
194+
informSubscribers(externalId, ForwardDataUnitProcessingStatus.RELEASE_FAILED);
195+
return;
196+
}
197+
this.cltuForwarderExecutor.submit(() -> {
198+
Socket sock = this.socket;
199+
if (sock == null) {
200+
LOG.severe(String.format("Transmission of CLTU with external ID %d failed: CLTU/CADU connector is disconnected", externalId));
201+
informSubscribers(externalId, ForwardDataUnitProcessingStatus.RELEASE_FAILED);
202+
return;
203+
}
204+
// Try to send CLTU
205+
try {
206+
LOG.log(Level.INFO, String.format("Sending CLTU with ID %d: %s", externalId, StringUtil.toHexDump(cltu)));
207+
sock.getOutputStream().write(cltu);
208+
} catch (IOException e) {
209+
LOG.log(Level.SEVERE, String.format("Transmission of CLTU with external ID %d failed: CLTU/CADU connector error on write", externalId), e);
210+
informSubscribers(externalId, ForwardDataUnitProcessingStatus.RELEASE_FAILED);
211+
return;
212+
}
213+
// Sent
214+
informSubscribers(externalId, ForwardDataUnitProcessingStatus.RELEASED);
215+
});
216+
}
217+
218+
@Override
219+
public List<String> getSupportedRoutes() throws RemoteException {
220+
return Collections.singletonList("CLTU/CADU @ " + this.host + ":" + this.port);
221+
}
222+
223+
private void informSubscribers(long externalId, ForwardDataUnitProcessingStatus status) {
224+
informSubscribers(externalId, status, Instant.now());
225+
}
226+
227+
private void informSubscribers(long externalId, ForwardDataUnitProcessingStatus status, Instant time) {
228+
subscribers.forEach(o -> o.informStatusUpdate(externalId, status, time));
229+
}
230+
}

eu.dariolucia.reatmetric.driver.spacecraft/src/main/java/eu/dariolucia/reatmetric/driver/spacecraft/definition/TmDataLinkConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public class TmDataLinkConfiguration {
4444
@XmlAttribute(name = "aos-insert-zone-length")
4545
private int aosTransferFrameInsertZoneLength = 0;
4646

47+
@XmlAttribute(name = "frame-length")
48+
private int frameLength = -1;
49+
4750
public TmDataLinkConfiguration() {
4851
}
4952

@@ -103,4 +106,11 @@ public void setType(TransferFrameType type) {
103106
this.type = type;
104107
}
105108

109+
public void setFrameLength(int frameLength) {
110+
this.frameLength = frameLength;
111+
}
112+
113+
public int getFrameLength() {
114+
return frameLength;
115+
}
106116
}

eu.dariolucia.reatmetric.driver.spacecraft/src/main/java/module-info.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import eu.dariolucia.reatmetric.driver.spacecraft.SpacecraftDriver;
1818
import eu.dariolucia.reatmetric.driver.spacecraft.activity.TcPacketInfoValueExtensionHandler;
19+
import eu.dariolucia.reatmetric.driver.spacecraft.connectors.CltuCaduTcpConnector;
1920
import eu.dariolucia.reatmetric.driver.spacecraft.encoding.SpacePacketDecodingExtension;
2021
import eu.dariolucia.reatmetric.driver.spacecraft.encoding.SpacePacketEncodingExtension;
2122
import eu.dariolucia.reatmetric.driver.spacecraft.services.impl.CommandVerificationService;
@@ -61,4 +62,6 @@
6162
provides eu.dariolucia.ccsds.encdec.extension.IEncoderExtension with SpacePacketEncodingExtension;
6263

6364
provides eu.dariolucia.reatmetric.driver.spacecraft.services.IService with CommandVerificationService, OnboardEventService, OnboardOperationsSchedulingService, TimeCorrelationService;
65+
66+
provides eu.dariolucia.reatmetric.driver.spacecraft.activity.cltu.ICltuConnector with CltuCaduTcpConnector;
6467
}

eu.dariolucia.reatmetric.persist/src/test/java/eu/dariolucia/reatmetric/persist/ActivityOccurrenceDataArchiveTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ void testActivityOccurrenceDataStoreRetrieve() throws IOException, ArchiveExcept
166166
assertEquals("routeA", items.get(0).getRoute());
167167
assertEquals("routeB", items.get(1).getRoute());
168168
// retrieve: expected 2
169-
items = activityArchive.retrieve(t.plusMillis(2000), null, null);
169+
items = activityArchive.retrieve(t.plusMillis(2000), (ActivityOccurrenceDataFilter) null, null);
170170
assertEquals(2, items.size());
171171
assertEquals(1L, items.get(0).getInternalId().asLong());
172172
assertEquals(0L, items.get(1).getInternalId().asLong());

eu.dariolucia.reatmetric.persist/src/test/java/eu/dariolucia/reatmetric/persist/AlarmParameterDataArchiveTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void testAlarmParameterDataStoreRetrieve() throws IOException, ArchiveException,
7676
));
7777
Thread.sleep(2000);
7878
// Retrieve at t + 250 ms
79-
List<AlarmParameterData> params = alarmDataArchive.retrieve(t.plusMillis(250), null, null);
79+
List<AlarmParameterData> params = alarmDataArchive.retrieve(t.plusMillis(250), (AlarmParameterDataFilter) null, null);
8080
assertEquals(3, params.size());
8181
for (AlarmParameterData pd : params) {
8282
if (pd.getPath().asString().equals("TEST.PARAM1")) {

eu.dariolucia.reatmetric.persist/src/test/java/eu/dariolucia/reatmetric/persist/ParameterDataArchiveTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void testParameterDataStoreRetrieve() throws IOException, ArchiveException, Inte
7777
));
7878
Thread.sleep(2000);
7979
// Retrieve at t + 250 ms
80-
List<ParameterData> params = parameterDataArchive.retrieve(t.plusMillis(250), null, null);
80+
List<ParameterData> params = parameterDataArchive.retrieve(t.plusMillis(250), (ParameterDataFilter) null, null);
8181
assertEquals(3, params.size());
8282
for (ParameterData pd : params) {
8383
if (pd.getPath().asString().equals("TEST.PARAM1")) {

0 commit comments

Comments
 (0)