1 package org.openslx.imagemaster.serverconnection;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.nio.ByteBuffer;
8 import java.security.KeyStore;
9 import java.sql.SQLException;
10 import java.util.Iterator;
11 import java.util.List;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.SynchronousQueue;
16 import java.util.concurrent.TimeUnit;
18 import javax.net.ssl.KeyManager;
19 import javax.net.ssl.KeyManagerFactory;
20 import javax.net.ssl.SSLContext;
22 import org.apache.logging.log4j.LogManager;
23 import org.apache.logging.log4j.Logger;
24 import org.
openslx.bwlp.thrift.iface.ImagePublishData;
25 import org.
openslx.bwlp.thrift.iface.InvocationError;
26 import org.
openslx.bwlp.thrift.iface.TInvocationException;
27 import org.
openslx.bwlp.thrift.iface.TTransferRejectedException;
28 import org.
openslx.bwlp.thrift.iface.TransferInformation;
35 import org.
openslx.thrifthelper.ImagePublishDataEx;
36 import org.
openslx.util.CascadedThreadPoolExecutor;
56 private final ExecutorService
transferPool =
new CascadedThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
57 new SynchronousQueue<Runnable>(),
66 Listener plain = null;
70 KeyStore keystore = KeyStore.getInstance(
"JKS" );
71 keystore.load(
new FileInputStream( pathToKeyStore ), passphrase );
72 KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
73 kmf.init( keystore, passphrase );
74 SSLContext sslContext = SSLContext.getInstance(
"TLSv1.2" );
75 KeyManager[] keyManagers = kmf.getKeyManagers();
76 sslContext.init( keyManagers, null, null );
86 }
catch ( Exception e ) {
87 LOGGER.error(
"Initialization failed.", e );
91 plainListener = plain;
92 QuickTimer.scheduleAtFixedDelay(
new Task() {
97 long now = System.currentTimeMillis();
98 for ( Iterator<IncomingTransfer> it = incomingTransfersByTransferId.values().iterator(); it.hasNext(); ) {
100 if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
101 LOGGER.debug(
"Removing transfer " + t.getId() );
105 for ( Iterator<IncomingTransfer> it = incomingTransfersByVersionId.values().iterator(); it.hasNext(); ) {
107 if ( t.isComplete( now ) || t.hasReachedIdleTimeout( now ) ) {
117 if ( sslListener != null && sslListener.isRunning() )
118 return sslListener.getPort();
124 if ( plainListener != null && plainListener.isRunning() )
125 return plainListener.getPort();
139 throws TTransferRejectedException, TInvocationException
143 transfer = incomingTransfersByVersionId.get( img.imageVersionId );
144 if ( transfer == null ) {
146 throw new TTransferRejectedException(
"Too many active transfers" );
149 if ( existing == null ) {
150 absDestination =
new File(
new File(
Globals.
getImageDir(), img.imageBaseId ), img.imageVersionId );
156 }
catch ( FileNotFoundException e ) {
157 LOGGER.warn(
"Cannot init download to " + absDestination.toString(), e );
158 throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR,
"File access error" );
160 LOGGER.info(
"New incoming upload: " + transfer.getId() +
" for " + img.
imageVersionId +
" (" + img.imageName +
")" );
161 incomingTransfersByTransferId.put( transfer.getId(), transfer );
162 incomingTransfersByVersionId.put( img.imageVersionId, transfer );
164 LOGGER.info(
"Another request for existing upload: " + transfer.getId() +
" for " + img.
imageVersionId +
" (" + img.imageName
172 throws TTransferRejectedException
174 IncomingTransfer transfer = incomingTransfersByVersionId.get( imageData.imageVersionId );
175 if ( transfer == null )
177 if ( transfer.getFileSize() != imageData.fileSize )
178 throw new TTransferRejectedException(
"File size mismatch" );
179 if ( !transfer.hashesEqual( crcSums ) )
180 throw new TTransferRejectedException(
"Block hashes mismatch" );
186 return incomingTransfersByTransferId.get( uploadToken );
196 if ( transfer == null ) {
197 LOGGER.debug(
"Unknown download token received" );
198 uploader.sendErrorCode(
"Unknown download token." );
203 uploader.sendErrorCode(
"Too many concurrent uploads." );
206 if ( !transfer.addConnection( uploader, transferPool ) ) {
217 IncomingTransfer transfer = incomingTransfersByTransferId.get( downloader.getToken() );
218 if ( transfer == null ) {
219 downloader.sendErrorCode(
"Unknown upload token." );
224 downloader.sendErrorCode(
"Too many concurrent uploads." );
228 if ( !transfer.addConnection( downloader, transferPool ) ) {
235 final long now = System.currentTimeMillis();
238 if ( t.countsTowardsConnectionLimit( now ) ) {
239 active += t.getActiveConnectionCount();
247 final long now = System.currentTimeMillis();
250 if ( t.countsTowardsConnectionLimit( now ) ) {
251 active += t.getActiveConnectionCount();
259 incomingTransfersByTransferId.remove( transfer.getId() );
263 public static TransferInformation
registerDownload( ImagePublishDataEx img )
throws TTransferRejectedException, TInvocationException
268 if ( !absSource.exists() ) {
269 LOGGER.error( absSource.toString() +
" missing!" );
272 }
catch ( SQLException e ) {
274 throw new TTransferRejectedException(
"File missing on server" );
276 if ( absSource.length() != img.fileSize ) {
277 LOGGER.error( absSource.toString() +
" has unexpected size (is: " + absSource.length() +
", should: " + img.fileSize +
")" );
280 }
catch ( SQLException e ) {
282 throw new TTransferRejectedException(
"File corrupted on server" );
286 throw new TTransferRejectedException(
"Too many active transfers" );
288 plainListener.start();
291 outgoingTransfers.put( transfer.getId(), transfer );
293 return transfer.getTransferInfo();
static int getFiletransferPortSsl()
static String getImageDir()
final String imageVersionId
static int getFiletransferTimeout()
static final Logger LOGGER
static Map< String, OutgoingTransfer > outgoingTransfers
void incomingDownloadRequest(final Uploader uploader)
Server is uploading - client is downloading!
Object getImageVersionId()
static IncomingTransfer getUploadByToken(String uploadToken)
Representing an image in the database.
Class to handle all incoming and outgoing connections.
static final Listener sslListener
synchronized void cancel()
static IncomingTransfer getExistingUpload(ImagePublishData imageData, List< ByteBuffer > crcSums)
Class to hold global constants and properties from 'config/global.properties'.
static String getSslKeystoreFile()
static TransferInformation registerDownload(ImagePublishDataEx img)
static final int MAX_TRANSFERS
static int getDownloadConnectionCount()
static IncomingEvent eventHandler
static int getPlainPort()
static void removeUpload(IncomingTransfer transfer)
static void markValid(String imageVersionId, boolean isValid)
static int getFiletransferPortPlain()
static String getSslKeystorePassword()
static int getUploadConnectionCount()
void incomingUploadRequest(final Downloader downloader)
Server is downloading - client is uploading!
static final Listener plainListener
static final Map< String, IncomingTransfer > incomingTransfersByVersionId
final ExecutorService transferPool
static IncomingTransfer registerUpload(ImagePublishData img, List< ByteBuffer > blockHashes, ImagePublishDataEx existing)
Register new incoming transfer from a satellite server.
static Map< String, IncomingTransfer > incomingTransfersByTransferId