bwLehrpool Masterserver
Manages authentication and sharing of virtual machines between participating institutions
ConnectionHandler.java
Go to the documentation of this file.
1 package org.openslx.imagemaster.serverconnection;
2 
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.nio.ByteBuffer;
8 import java.security.KeyStore;
9 import java.sql.SQLException;
10 import java.util.Iterator;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.SynchronousQueue;
16 import java.util.concurrent.TimeUnit;
17 
18 import javax.net.ssl.KeyManager;
19 import javax.net.ssl.KeyManagerFactory;
20 import javax.net.ssl.SSLContext;
21 
22 import org.apache.logging.log4j.LogManager;
23 import org.apache.logging.log4j.Logger;
24 import org.openslx.bwlp.thrift.iface.ImagePublishData;
25 import org.openslx.bwlp.thrift.iface.InvocationError;
26 import org.openslx.bwlp.thrift.iface.TInvocationException;
27 import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
28 import org.openslx.bwlp.thrift.iface.TransferInformation;
29 import org.openslx.filetransfer.Downloader;
30 import org.openslx.filetransfer.IncomingEvent;
31 import org.openslx.filetransfer.Listener;
32 import org.openslx.filetransfer.Uploader;
35 import org.openslx.thrifthelper.ImagePublishDataEx;
36 import org.openslx.util.CascadedThreadPoolExecutor;
37 import org.openslx.util.QuickTimer;
38 import org.openslx.util.QuickTimer.Task;
39 
44 public class ConnectionHandler implements IncomingEvent
45 {
46 
47  private static final Logger LOGGER = LogManager.getLogger( ConnectionHandler.class );
48 
49  private static final int MAX_TRANSFERS = 12;
50 
51  private static Map<String, IncomingTransfer> incomingTransfersByTransferId = new ConcurrentHashMap<>();
52  private static final Map<String, IncomingTransfer> incomingTransfersByVersionId = new ConcurrentHashMap<>();
53 
54  private static Map<String, OutgoingTransfer> outgoingTransfers = new ConcurrentHashMap<>();
55  private static IncomingEvent eventHandler = new ConnectionHandler();
56  private final ExecutorService transferPool = new CascadedThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
57  new SynchronousQueue<Runnable>(),
58  new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ), null );
59 
60  private static final Listener plainListener;
61  private static final Listener sslListener;
62 
63  static {
64  LOGGER.debug( "Starting BFTP on port " + Globals.getFiletransferPortSsl() + "+ and " + Globals.getFiletransferPortPlain() );
65  Listener ssl = null;
66  Listener plain = null;
67  try {
68  String pathToKeyStore = Globals.getSslKeystoreFile();
69  char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
70  KeyStore keystore = KeyStore.getInstance( "JKS" );
71  keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
72  KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
73  kmf.init( keystore, passphrase );
74  SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
75  KeyManager[] keyManagers = kmf.getKeyManagers();
76  sslContext.init( keyManagers, null, null );
77  if ( Globals.getFiletransferPortSsl() > 0 ) {
78  ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
79  ssl.start();
80  }
81  if ( Globals.getFiletransferPortPlain() > 0 ) {
82  plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
83  plain.start();
84  }
85  // TODO: Bail out/retry if failed
86  } catch ( Exception e ) {
87  LOGGER.error( "Initialization failed.", e );
88  System.exit( 2 );
89  }
90  sslListener = ssl;
91  plainListener = plain;
92  QuickTimer.scheduleAtFixedDelay( new Task() {
93 
94  @Override
95  public void fire()
96  {
97  long now = System.currentTimeMillis();
98  for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) {
99  IncomingTransfer t = it.next();
100  if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
101  LOGGER.debug( "Removing transfer " + t.getId() );
102  it.remove();
103  }
104  }
105  for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) {
106  IncomingTransfer t = it.next();
107  if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
108  it.remove();
109  }
110  }
111  }
112  }, 10000, 301000 );
113  }
114 
115  public static int getSslPort()
116  {
117  if ( sslListener != null && sslListener.isRunning() )
118  return sslListener.getPort();
119  return 0;
120  }
121 
122  public static int getPlainPort()
123  {
124  if ( plainListener != null && plainListener.isRunning() )
125  return plainListener.getPort();
126  return 0;
127  }
128 
138  public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes, ImagePublishDataEx existing )
139  throws TTransferRejectedException, TInvocationException
140  {
141  IncomingTransfer transfer;
142  synchronized ( incomingTransfersByTransferId ) {
143  transfer = incomingTransfersByVersionId.get( img.imageVersionId );
144  if ( transfer == null ) {
145  if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
146  throw new TTransferRejectedException( "Too many active transfers" );
147  }
148  File absDestination;
149  if ( existing == null ) {
150  absDestination = new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId );
151  } else {
152  absDestination = new File( Globals.getImageDir(), existing.exImagePath );
153  }
154  try {
155  transfer = new IncomingTransfer( img, blockHashes, absDestination, getPlainPort(), getSslPort() );
156  } catch ( FileNotFoundException e ) {
157  LOGGER.warn( "Cannot init download to " + absDestination.toString(), e );
158  throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
159  }
160  LOGGER.info( "New incoming upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName + ")" );
161  incomingTransfersByTransferId.put( transfer.getId(), transfer );
162  incomingTransfersByVersionId.put( img.imageVersionId, transfer );
163  } else {
164  LOGGER.info( "Another request for existing upload: " + transfer.getId() + " for " + img.imageVersionId + " (" + img.imageName
165  + ")" );
166  }
167  }
168  return transfer;
169  }
170 
171  public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
172  throws TTransferRejectedException
173  {
174  IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId );
175  if ( transfer == null )
176  return null;
177  if ( transfer.getFileSize() != imageData.fileSize )
178  throw new TTransferRejectedException( "File size mismatch" );
179  if ( !transfer.hashesEqual( crcSums ) )
180  throw new TTransferRejectedException( "Block hashes mismatch" );
181  return transfer;
182  }
183 
184  public static IncomingTransfer getUploadByToken( String uploadToken )
185  {
186  return incomingTransfersByTransferId.get( uploadToken );
187  }
188 
192  @Override
193  public void incomingDownloadRequest( final Uploader uploader )
194  {
195  OutgoingTransfer transfer = outgoingTransfers.get( uploader.getToken() );
196  if ( transfer == null ) {
197  LOGGER.debug( "Unknown download token received" );
198  uploader.sendErrorCode( "Unknown download token." );
199  uploader.cancel();
200  return;
201  }
202  if ( getDownloadConnectionCount() >= MAX_TRANSFERS ) {
203  uploader.sendErrorCode( "Too many concurrent uploads." );
204  uploader.cancel();
205  }
206  if ( !transfer.addConnection( uploader, transferPool ) ) {
207  uploader.cancel();
208  }
209  }
210 
214  @Override
215  public void incomingUploadRequest( final Downloader downloader ) throws IOException
216  {
217  IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() );
218  if ( transfer == null ) {
219  downloader.sendErrorCode( "Unknown upload token." );
220  downloader.cancel();
221  return;
222  }
223  if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
224  downloader.sendErrorCode( "Too many concurrent uploads." );
225  downloader.cancel();
226  return;
227  }
228  if ( !transfer.addConnection( downloader, transferPool ) ) {
229  downloader.cancel();
230  }
231  }
232 
233  public static int getUploadConnectionCount()
234  {
235  final long now = System.currentTimeMillis();
236  int active = 0;
237  for ( IncomingTransfer t : incomingTransfersByTransferId.values() ) {
238  if ( t.countsTowardsConnectionLimit( now ) ) {
239  active += t.getActiveConnectionCount();
240  }
241  }
242  return active;
243  }
244 
245  public static int getDownloadConnectionCount()
246  {
247  final long now = System.currentTimeMillis();
248  int active = 0;
249  for ( OutgoingTransfer t : outgoingTransfers.values() ) {
250  if ( t.countsTowardsConnectionLimit( now ) ) {
251  active += t.getActiveConnectionCount();
252  }
253  }
254  return active;
255  }
256 
257  public static void removeUpload( IncomingTransfer transfer )
258  {
259  incomingTransfersByTransferId.remove( transfer.getId() );
260  incomingTransfersByVersionId.remove( transfer.getImageVersionId() );
261  }
262 
263  public static TransferInformation registerDownload( ImagePublishDataEx img ) throws TTransferRejectedException, TInvocationException
264  {
265  OutgoingTransfer transfer;
266  File absSource;
267  absSource = new File( Globals.getImageDir(), img.exImagePath );
268  if ( !absSource.exists() ) {
269  LOGGER.error( absSource.toString() + " missing!" );
270  try {
271  DbImage.markValid( img.imageVersionId, false );
272  } catch ( SQLException e ) {
273  }
274  throw new TTransferRejectedException( "File missing on server" );
275  }
276  if ( absSource.length() != img.fileSize ) {
277  LOGGER.error( absSource.toString() + " has unexpected size (is: " + absSource.length() + ", should: " + img.fileSize + ")" );
278  try {
279  DbImage.markValid( img.imageVersionId, false );
280  } catch ( SQLException e ) {
281  }
282  throw new TTransferRejectedException( "File corrupted on server" );
283  }
284  synchronized ( outgoingTransfers ) {
286  throw new TTransferRejectedException( "Too many active transfers" );
287  }
288  plainListener.start();
289  sslListener.start();
290  transfer = new OutgoingTransfer( absSource, getPlainPort(), getSslPort() );
291  outgoingTransfers.put( transfer.getId(), transfer );
292  }
293  return transfer.getTransferInfo();
294  }
295 
296 }
static int getFiletransferPortSsl()
Definition: Globals.java:90
static int getFiletransferTimeout()
Definition: Globals.java:100
static Map< String, OutgoingTransfer > outgoingTransfers
void incomingDownloadRequest(final Uploader uploader)
Server is uploading - client is downloading!
static IncomingTransfer getUploadByToken(String uploadToken)
Representing an image in the database.
Definition: DbImage.java:32
Class to handle all incoming and outgoing connections.
static IncomingTransfer getExistingUpload(ImagePublishData imageData, List< ByteBuffer > crcSums)
Class to hold global constants and properties from 'config/global.properties'.
Definition: Globals.java:16
static String getSslKeystoreFile()
Definition: Globals.java:139
static TransferInformation registerDownload(ImagePublishDataEx img)
static void markValid(String imageVersionId, boolean isValid)
Definition: DbImage.java:146
static int getFiletransferPortPlain()
Definition: Globals.java:95
static String getSslKeystorePassword()
Definition: Globals.java:149
void incomingUploadRequest(final Downloader downloader)
Server is downloading - client is uploading!
static final Map< String, IncomingTransfer > incomingTransfersByVersionId
static IncomingTransfer registerUpload(ImagePublishData img, List< ByteBuffer > blockHashes, ImagePublishDataEx existing)
Register new incoming transfer from a satellite server.
static Map< String, IncomingTransfer > incomingTransfersByTransferId