bwLehrpool Masterserver
Manages authentication and sharing of virtual machines between participating institutions
NetworkHandler.java
Go to the documentation of this file.
1 package org.openslx.imagemaster.localrpc;
2 
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.DatagramSocket;
6 import java.net.InetAddress;
7 import java.net.SocketAddress;
8 import java.net.SocketException;
9 import java.nio.charset.StandardCharsets;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 
13 import org.apache.logging.log4j.LogManager;
14 import org.apache.logging.log4j.Logger;
15 import org.openslx.bwlp.thrift.iface.ClientSessionData;
16 import org.openslx.bwlp.thrift.iface.TAuthorizationException;
17 import org.openslx.bwlp.thrift.iface.UserInfo;
20 
21 import com.google.gson.Gson;
22 
27 public class NetworkHandler implements Runnable
28 {
29 
30  private static final Logger log = LogManager.getLogger( NetworkHandler.class );
31 
32  private Thread sendThread = null;
36  private final Sender sender;
40  private final DatagramSocket socket;
44  private final Gson gson = new Gson();
45 
49  public NetworkHandler( int port, InetAddress listenAddress ) throws SocketException
50  {
51  socket = new DatagramSocket( port, listenAddress );
52  sendThread = new Thread( sender = new Sender() );
53  }
54 
55  public void shutdown()
56  {
57  socket.close();
58  }
59 
71  private void send( SocketAddress destination, byte[] buffer )
72  {
73  final DatagramPacket packet;
74  try {
75  packet = new DatagramPacket( buffer, buffer.length, destination );
76  } catch ( Exception e ) {
77  log.warn( "Could not construct datagram packet for target " + destination.toString() );
78  e.printStackTrace();
79  return;
80  }
81  sender.send( packet );
82  }
83 
87  @Override
88  public void run()
89  {
90  byte readBuffer[] = new byte[ 66000 ];
91  try {
92  sendThread.start();
93  while ( !Thread.interrupted() ) {
94  DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
95  try {
96  socket.receive( packet );
97  } catch ( IOException e ) {
98  log.info( "IOException on UDP socket when reading: " + e.getMessage() );
99  Thread.sleep( 100 );
100  continue;
101  }
102  if ( packet.getLength() < 2 ) {
103  log.debug( "Message too short" );
104  continue;
105  }
106  String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
107  try {
108  String reply = handle( payload );
109  if ( reply != null )
110  send( packet.getSocketAddress(), reply.getBytes( StandardCharsets.UTF_8 ) );
111  } catch ( Throwable t ) {
112  log.error( "Exception in RequestParser: " + t.toString() );
113  log.error( "Payload was: " + payload );
114  t.printStackTrace();
115  }
116  }
117  } catch ( InterruptedException e ) {
118  Thread.currentThread().interrupt();
119  } finally {
120  sendThread.interrupt();
121  log.info( "UDP receiver finished." );
122  }
123  }
124 
125  private String handle( String payload )
126  {
127  try {
128  JsonUser ju = gson.fromJson( payload, JsonUser.class );
129  TAuthorizationException ex = ju.toException();
130  if ( ex != null ) {
132  return "Auth error";
133  } else {
134  UserInfo u = ju.toUser();
135  if ( u == null ) {
136  log.warn( "Invalid or inomplete RPC data (" + payload + ")" );
137  return "Invalid or incomplete RPC data";
138  }
139  ClientSessionData sd = SessionManager.addSession( new Session( u, ju.timeoutSeconds() * 1000 ), ju.accessCode() );
140  return "TOKEN:" + sd.authToken + " SESSIONID:" + sd.sessionId;
141  }
142  } catch ( Throwable t ) {
143  log.error( "Exception on json decode", t );
144  }
145  return "Json error";
146  }
147 
152  private class Sender implements Runnable
153  {
154 
158  private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );
159 
163  @Override
164  public void run()
165  {
166  try {
167  while ( !Thread.interrupted() ) {
168  final DatagramPacket packet;
169  packet = queue.take();
170  try {
171  socket.send( packet );
172  } catch ( IOException e ) {
173  log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() );
174  }
175  }
176  } catch ( InterruptedException e ) {
177  Thread.currentThread().interrupt();
178  } finally {
179  log.info( "UDP sender finished." );
180  }
181  }
182 
187  public void send( DatagramPacket packet )
188  {
189  if ( queue.offer( packet ) )
190  return;
191  log.warn( "Could not add packet to queue: Full" );
192  }
193 
194  }
195 
196 }
TAuthorizationException toException()
Definition: JsonUser.java:47
NetworkHandler(int port, InetAddress listenAddress)
Initialize the NetworkHandler by starting threads and opening the socket.
final Sender sender
Sender instance (Runnable handling outgoing packets)
void run()
Main loop of receiving thread - wait until a packet arrives, then try to handle/decode.
void send(DatagramPacket packet)
Add something to the outgoing packet queue.
final DatagramSocket socket
UDP socket for sending and receiving.
static void addAuthError(TAuthorizationException ex, String accessToken)
static ClientSessionData addSession(Session session)
void send(SocketAddress destination, byte[] buffer)
Prepare and enqueue reply for client request.
Simple representation of a user session.
Definition: Session.java:11
void run()
Wait until something is put into the queue, then send it.
Class for managing active user sessions.
The network listener that will receive incoming UDP packets, try to process them, and then send a rep...
final BlockingQueue< DatagramPacket > queue
Queue to stuff outgoing packets into.