1 package org.openslx.imagemaster.localrpc;
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.DatagramSocket;
6 import java.net.InetAddress;
7 import java.net.SocketAddress;
8 import java.net.SocketException;
9 import java.nio.charset.StandardCharsets;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
13 import org.apache.logging.log4j.LogManager;
14 import org.apache.logging.log4j.Logger;
15 import org.
openslx.bwlp.thrift.iface.ClientSessionData;
16 import org.
openslx.bwlp.thrift.iface.TAuthorizationException;
21 import com.google.gson.Gson;
44 private final Gson
gson =
new Gson();
49 public NetworkHandler(
int port, InetAddress listenAddress )
throws SocketException
51 socket =
new DatagramSocket( port, listenAddress );
52 sendThread =
new Thread( sender =
new Sender() );
71 private void send( SocketAddress destination, byte[] buffer )
73 final DatagramPacket packet;
75 packet =
new DatagramPacket( buffer, buffer.length, destination );
76 }
catch ( Exception e ) {
77 log.warn(
"Could not construct datagram packet for target " + destination.toString() );
81 sender.
send( packet );
90 byte readBuffer[] =
new byte[ 66000 ];
93 while ( !Thread.interrupted() ) {
94 DatagramPacket packet =
new DatagramPacket( readBuffer, readBuffer.length );
96 socket.receive( packet );
97 }
catch ( IOException e ) {
98 log.info(
"IOException on UDP socket when reading: " + e.getMessage() );
102 if ( packet.getLength() < 2 ) {
103 log.debug(
"Message too short" );
106 String payload =
new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
108 String reply =
handle( payload );
110 send( packet.getSocketAddress(), reply.getBytes( StandardCharsets.UTF_8 ) );
111 }
catch ( Throwable t ) {
112 log.error(
"Exception in RequestParser: " + t.toString() );
113 log.error(
"Payload was: " + payload );
117 }
catch ( InterruptedException e ) {
118 Thread.currentThread().interrupt();
120 sendThread.interrupt();
121 log.info(
"UDP receiver finished." );
136 log.warn(
"Invalid or inomplete RPC data (" + payload +
")" );
137 return "Invalid or incomplete RPC data";
140 return "TOKEN:" + sd.authToken +
" SESSIONID:" + sd.sessionId;
142 }
catch ( Throwable t ) {
143 log.error(
"Exception on json decode", t );
158 private final BlockingQueue<DatagramPacket>
queue =
new LinkedBlockingQueue<>( 128 );
167 while ( !Thread.interrupted() ) {
168 final DatagramPacket packet;
169 packet = queue.take();
171 socket.send( packet );
172 }
catch ( IOException e ) {
173 log.debug(
"Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() );
176 }
catch ( InterruptedException e ) {
177 Thread.currentThread().interrupt();
179 log.info(
"UDP sender finished." );
187 public void send( DatagramPacket packet )
189 if ( queue.offer( packet ) )
191 log.warn(
"Could not add packet to queue: Full" );
TAuthorizationException toException()
NetworkHandler(int port, InetAddress listenAddress)
Initialize the NetworkHandler by starting threads and opening the socket.
final Sender sender
Sender instance (Runnable handling outgoing packets)
void run()
Main loop of receiving thread - wait until a packet arrives, then try to handle/decode.
void send(DatagramPacket packet)
Add something to the outgoing packet queue.
final DatagramSocket socket
UDP socket for sending and receiving.
String handle(String payload)
static void addAuthError(TAuthorizationException ex, String accessToken)
static ClientSessionData addSession(Session session)
void send(SocketAddress destination, byte[] buffer)
Prepare and enqueue reply for client request.
Simple representation of a user session.
void run()
Wait until something is put into the queue, then send it.
Class for managing active user sessions.
The network listener that will receive incoming UDP packets, try to process them, and then send a rep...
final BlockingQueue< DatagramPacket > queue
Queue to stuff outgoing packets into.
final Gson gson
Gson class.