Industria 4.0: come simulare un flusso di dati durante il monitoraggio di un processo
Overview
In questo progetto costruiremo un’applicazione immaginata per simulare i dati ricevuti dagli attuatori di un estrusore di plastica. Si tratta di un caso specifico che ci è capitato realmente, ma il progetto si presta facilmente ad essere adattato ad altri contesti.
Per simulare la presenza di N misuratori abbiamo usato un processo con più thread. Ogni thread rappresenta un sensore con i suoi dati, ognuno è dotato di un parametro Id, che lo rende identificabile, e può appartenere a diverse tipologie (Circolare / Lineare).
Una volta in esecuzione il simulatore espone delle rotte che interrogate dal client forniscono l’output desiderato: ogni thread restituisce un file JSON al ritmo di uno al secondo. Per ottenere questo risultato abbiamo usato la tecnologia WebSocket, che permette una comunicazione client-server, continua, bidirezionale, full-duplex (può avvenire in entrambe le direzioni simultaneamente) e a bassa latenza.
Il nostro simulatore è dunque sostanzialmente un’applicazione realizzata con multithreading e WebSocket che rende in output un flusso di dati, in formato JSON, su delle rotte prestabilite. Nel nostro caso questi files rappresentano un flusso di dati emessi da degli attuatori, ma secondo le vostre esigenze potete immaginare rappresentino altro.
Struttura del progetto
Il progetto ha una struttura semplice:
src/main/java/it/wellnetconsulting
- App: la classe con il metodo main.
- CrunchifyJSONFileWrite: la classe che si occupa di produrre i file JSON.
- DataWebsocket: la nostra websocket.
- Measurer: crea l’oggetto “Misuratore”, ovvero il thread che simula l’invio dei dati del sensore.
- MeasurersManager: classe manager che inserisce gli oggetti di tipo Meausure in una hashMap, con il parametro “id” come chiave, e ritrova e restituisce gli oggetti inseriti con un metodo get.
test/java/it/wellnetconsulting
- AppleTest: classe di Test realizzata con il framework di testing Junit.
Ci sono diversi aspetti interessanti in questo progetto che racconterò sinteticamente in questo articolo. In particolare: Spark Java, Lambda Expressions, Multithreading, Websocket.
Spark Java
L’applicazione è stata sviluppata in Java 1.8 con il framework Spark Java. Spark è un web framework minimale, open source, che non segue il modello MVC, ma è pensato piuttosto per creare applicazioni in modo particolarmente semplice e veloce. Spark è costruito intorno alla gestione di rotte e rispettivi handler. Ogni rotta si compone di tre elementi: un verbo, un path, e un handler. E il codice risulta particolarmente compatto e leggibile, grazie all’uso delle espressioni Lambda, introdotte da Java 1.8.
Ecco ad esempio i metodi, che abbiamo scritto nella classe App, per far partire o arrestare i thread:
post("protune/measurer/start_measurer/:measurer_id", (request, response) -> { System.out.println("Got start_measurer request"); Measurer measurer = MeasurersManager.getMeasurerById(request.params(":measurer_id")); if(measurer != null) measurer.resumeWork(); return ""; }); post("protune/measurer/stop_measurer/:measurer_id", (request, response) -> { System.out.println("Got stop_measurer request"); Measurer measurer = MeasurersManager.getMeasurerById(request.params(":measurer_id")); if(measurer != null) measurer.pauseWork(); return ""; });
Le rotte indicate,
protune/measurer/start_measurer/:measurer_id protune/measurer/stop_measurer/:measurer_id
sono di tipo POST, e prevedono che si dia in ingresso il parametro meausure_id, che identifica il thread da lanciare o arrestare.
Nel nostro progetto abbiamo creato tre thread, chiamati: M0, M1, M2. Questi saranno i valori che il parametro meausure_id potrà assumere. Quando il client interrogherà le rotte programmate, potrà far partire o arrestare l’invio di dati dei differenti thread, ottenendo l’output previsto.
Se invece vogliamo vedere l’output a console possiamo lanciare a terminale il comando:
curl -X POST http://localhost:4567/protune/measurer/start_measurer/M0
Come abbiamo detto, l’ultima parte dopo lo slash, corrisponde a measurer_id e identifica il thread a cui il comando si riferisce. Nel nostro esempio quindi avvieremo il thread M0, richiamando il metodo di avvio measurer.resumeWork(). Il risultato sarà una sequenza di oggetti JSON, con questa struttura:
M0: I am alive Measurer.run() 0 Update_Position Successfully Copied JSON Object to File... JSON Object: {"payload":{"measurer":{"cursor_position":0,"values":["0|0.16200453248476343"]}},"type":"UPDATE_POSITION","rotation_direction":"false"}
Potremo poi arrestare M0 con il comando apposito, che richiama il metodo measurer.pauseWork():
curl -X POST http://localhost:4567/protune/measurer/stop_measurer/M0
Abbiamo cercato di dare ai metodi dei nomi parlanti, come pauseWork, e resumeWork, nell’ottica di rendere il nostro codice il più leggibile possibile.
Threading
Il progetto simula diversi misuratori, dotati di sensori, che inviano dati su un processo in corso (nel nostro caso l’estrusione della plastica). E come abbiamo iniziato a vedere, l'attivazione di un misuratore corrisponde all'avvio di un thread, che ad ogni ciclo di clock effettua il push dei dati sul Websocket. Nel progetto abbiamo i tre thread M0, M1, M2, che possono avere ciascuno due tipi di ciclo diversi, chiamati Linear e Circular. Il ciclo principale, Linear, va da 1 a 90, il secondo tipo di ciclo, Circular, è simile, ma arriva fino al quarantacinquesimo valore. Al termine del giro di misurazione, l’output è diverso: l’ultimo passaggio non rende più un oggetto UPDATE_POSITION, ma un oggetto UPDATE_PROFILES:
JSON Object: {"payload":{"measurer":{"cursor_position":356,"values":["....dati...."]}},"type":"UPDATE_POSITION","rotation_direction":"true"} Update_Profiles Successfully Copied JSON Object to File...
Se un thread non viene fermato con il comando apposito, al termine di un ciclo di un tipo prosegue con un ciclo dell’altro. Il passaggio da un tipo di ciclo all’altro è controllato da un semplice boolean, che si inverte quando il ciclo è ultimato:
rotation = !rotation;
Per impostare il controllo dei thread, ci siamo avvalsi della sincronizzazione su una variabile usata come lock, dei metodi wait() e notifyAll(), e del metodo sleep().
La classe Measures, che è la rappresentazione logica di un misuratore, estende la classe Thread e ha al suo interno un metodo run(), override del metodo run() di Thread, e i metodi pauseWork() e resumeWork(), che abbiamo già incontrato.
Il metodo run() non deve fermarsi fino a che non viene arrestato con il comando apposito. Quindi ha al suo interno un ciclo while(true), che fa sì che il thread continui a correre. E potrà fermarsi solo quando lo arresteremo con il metodo pauseWork().
All’interno del ciclo while(true) impostiamo innanzitutto la sincronizzazione del thread sull’oggetto pauseLock, e quindi eseguiamo un controllo sul boolean “paused”: se questo risulta true, l’esecuzione del thread viene sospesa, con il metodo waith().
if (paused) { randoVal.clear(); ciclo = 0; rotation =false; try { pauseLock.wait(); } catch (InterruptedException ex) { } }
A impostare il valore di “paused” su true è il metodo pauseWork(), che viene chiamato dal client, con la rotta
protune/measurer/stop_measurer/:measurer_id
Il thread dormiente si risveglierà quando sarà chiamato il metodo resumeWork(). Questo metodo farà partire il thread lanciando il metodo notifyAll() sull’oggetto pauseLock, su cui il nostro thread è sincronizzato. Impostando il boolean “paused” a false, prima di risvegliare il thread con notifyAll(), il nostro ciclo while(true) supererà l’if posto all’inizio e il thread farà la sua corsa.
public void resumeWork() { synchronized (pauseLock) { paused = false; pauseLock.notifyAll(); } }
Dal momento che come output vogliamo che ogni thread ci dia un JSON al secondo, il nostro ciclo while si conclude con:
try { Thread.sleep(1000); } catch (InterruptedException ex) { }
Il metodo sleep(1000) mette in pausa il thread in esecuzione per il tempo specificato, espresso in millisecondi.
JSON
JSON (Java Script Object Notation) è un semplice formato per lo scambio di dati: un file JSON è composto, sostanzialmente, da un insieme di coppie nome/valore. Il nome deve essere una stringa, e il valore deve essere un tipo di dato JSON valido.
JSON |
Java |
string |
java.lang.String |
number |
java.lang.Number |
true|false |
java.lang.Boolean |
null |
null |
array |
java.util.List |
object |
java.util.Map |
I nostri dati durante il ciclo di misurazione sono espressi nel formato rappresentato dall’oggetto UPDATE_POSITION. Invece al termine di un intero giro di misurazione il thread invia l’oggetto UPDATE_PROFILES.
Entrambi gli oggetti, di cui abbiamo visto sopra la struttura, sono prodotti dalla classe CrunchifyJSONFileWrite, che importa le classi org.json.simple.JSONObject e org.json.simple.JSONArray dalla libreria json-simple 1.1.1.
Un JSONObject è una mappa (java.util.Map) e una JSONArray è una lista (java.util.List). I nostri oggetti JSON sono restituiti dai metodi Update_Position() e Update_Profile() sotto forma di stringhe:
try { System.out.println("Update_Position Successfully Copied JSON Object to File..."); System.out.println("\nJSON Object: " + obj); return obj.toJSONString(); } catch (Exception e) { return ""; }
Obj, il JSONObject che i metodi rendono sotto forma di stringa, è costruito con una serie di oggetti JSON uno dentro l’altro.
Websocket
Per usare la tecnologia websocket abbiamo incluso le librerie:
java-websocket-1.3.o.jar javax.websocket-api-1.0-rc4.jar
le librerie di Tomcat (il server che abbiamo usato per far girare l’applicazione su Eclipse):
tomcat-websocket-9.0.2.jar tomcat-websocket-api-9.0.2.jar
e importato le seguenti classi di Jetty, il server embedded in Spark:
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket;
La classe che crea la nostra WebSocket è DataWebsocket, a cui abbiamo attribuito l’annotation @WebSocket, in questo modo:
@WebSocket public class DataWebsocket {
In questa classe abbiamo creato una lista che raccoglie gli elementi di tipo Session, e poiché facciamo viaggiare nella nostra WebSocket più thread in contemporanea, è importante che si tratti di un tipo di lista concurrent safe. Nel nostro caso ci siamo avvalsi di una ConcurrentLinkedDeque:
private static final Queue<Session> sessions = new ConcurrentLinkedDeque<>();
Su questa lista siamo andati a iterare con un for each, inviando un messaggio per ogni elemento Session presente in sessions, con il metodo sendMessage(String message)
public static void sendMessage(String message) throws IOException { for (Session session : sessions) { session.getRemote().sendString(message); } }
La sessione viene passata al Message, quando occorre l’evento. La risposta è un messaggio, passato in ingresso al metodo sendMessage(). Il metodo lancia una IOException che nel nostro progetto viene catturata dal try catch presente nella classe Measurer.
Il metodo connected (Session session), prende in ingresso la sessione e la aggiunge alla lista “sessions”. L’annotazione @OnWebsocketConnect contrassegna il metodo come quello che riceve l’evento On Connect.
@OnWebSocketConnect public void connected (Session session) { sessions.add(session); }
Il metodo closed() con l’annotazione @OnWebSocketClose è contrassegnato come il metodo che riceve l’evento On Close.
@OnWebSocketClose public void closed(Session session, int statusCode, String reason) { sessions.remove(session); }
La gestione delle dipendenze
Un’ultima osservazione sulle dipendenze. Abbiamo realizzato il progetto con Maven (Setting up Spark with Maven), le dipendenze sono quindi gestite attraverso un file POM.xml (Project Object Manager). Il POM in Maven è il file di configurazione che contiene tutte le informazioni sul progetto e si occupa di gestire e scaricare in automatico le librerie necessarie. Ad es. nel caso di Spark l'xml del nostro POM includerà fra le dipendenze
<dependency> <groupId>com.sparkjava</groupId> <artifactId>spark-core</artifactId> <version>2.7.2</version> <scope>compile</scope> </dependency>
ma non c’è bisogno di scrivere codice xml, perchè c’è una comoda interfaccia che permette di aggiungere le dipendenze. Per la nostra applicazione sono spark-core : 2.7.2, json-simple 1.1.1 e Junit 3.8.1, per il testing.