bwLehrpool Masterserver
Manages authentication and sharing of virtual machines between participating institutions
DbImageBlock.java
Go to the documentation of this file.
1 package org.openslx.imagemaster.db.mappers;
2 
3 import java.nio.ByteBuffer;
4 import java.sql.ResultSet;
5 import java.sql.SQLException;
6 import java.util.ArrayList;
7 import java.util.List;
8 import java.util.concurrent.ArrayBlockingQueue;
9 
10 import org.apache.logging.log4j.LogManager;
11 import org.apache.logging.log4j.Logger;
12 import org.openslx.filetransfer.FileRange;
13 import org.openslx.filetransfer.util.ChunkStatus;
14 import org.openslx.filetransfer.util.FileChunk;
18 
19 public class DbImageBlock
20 {
21 
22  private static final Logger LOGGER = LogManager.getLogger( DbImageBlock.class );
23 
24  private static AsyncThread asyncBlockUpdate = null;
25 
26  private static synchronized void initAsyncThread()
27  {
28  if ( asyncBlockUpdate == null ) {
29  asyncBlockUpdate = new AsyncThread();
30  asyncBlockUpdate.start();
31  }
32  }
33 
34  public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException
35  {
37  asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) );
38  }
39 
40  private static class AsyncThread extends Thread
41  {
42  private final ArrayBlockingQueue<ChunkUpdate> queue = new ArrayBlockingQueue<>( 100 );
43 
44  public void put( ChunkUpdate chunk ) throws InterruptedException
45  {
46  queue.put( chunk );
47  }
48 
49  @Override
50  public void run()
51  {
52  try {
53  while ( !interrupted() ) {
54  ChunkUpdate chunk = queue.take();
55  Thread.sleep( 100 );
56  try ( MysqlConnection connection = Database.getConnection() ) {
57  MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing"
58  + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" );
59  do {
60  stmt.setBoolean( "ismissing", chunk.isMissing );
61  stmt.setString( "imageversionid", chunk.imageVersionId );
62  stmt.setLong( "startbyte", chunk.range.startOffset );
63  stmt.setInt( "blocksize", chunk.range.getLength() );
64  stmt.executeUpdate();
65  chunk = queue.poll();
66  } while ( chunk != null );
67  connection.commit();
68  } catch ( SQLException e ) {
69  LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e );
70  continue;
71  }
72  Thread.sleep( 2000 );
73  }
74  } catch ( InterruptedException e ) {
75  LOGGER.debug( "async thread interrupted" );
76  interrupt();
77  }
78  }
79  }
80 
81  private static class ChunkUpdate
82  {
83  public final String imageVersionId;
84  public final FileRange range;
85  public final boolean isMissing;
86 
87  public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing )
88  {
89  this.imageVersionId = imageVersionId;
90  this.range = range;
91  this.isMissing = isMissing;
92  }
93  }
94 
95  public static void insertChunkList( String imageVersionId, List<FileChunk> all, boolean missing ) throws SQLException
96  {
97  try ( MysqlConnection connection = Database.getConnection() ) {
98  MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock"
99  + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES"
100  + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" );
101  stmt.setString( "imageversionid", imageVersionId );
102  stmt.setBoolean( "ismissing", missing );
103  for ( FileChunk chunk : all ) {
104  stmt.setLong( "startbyte", chunk.range.startOffset );
105  stmt.setInt( "blocksize", chunk.range.getLength() );
106  stmt.setBinary( "blocksha1", chunk.getSha1Sum() );
107  stmt.executeUpdate();
108  }
109  connection.commit();
110  } catch ( SQLException e ) {
111  LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e );
112  throw e;
113  }
114  }
115 
116  public static List<Boolean> getMissingStatusList( String imageVersionId ) throws SQLException
117  {
118  try ( MysqlConnection connection = Database.getConnection() ) {
119  MysqlStatement stmt = connection.prepareStatement( "SELECT startbyte, ismissing FROM imageblock"
120  + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC" );
121  stmt.setString( "imageversionid", imageVersionId );
122  ResultSet rs = stmt.executeQuery();
123  List<Boolean> list = new ArrayList<>();
124  long expectedOffset = 0;
125  while ( rs.next() ) {
126  long currentOffset = rs.getLong( "startbyte" );
127  if ( currentOffset < expectedOffset )
128  continue;
129  while ( currentOffset > expectedOffset ) {
130  list.add( Boolean.TRUE );
131  expectedOffset += FileChunk.CHUNK_SIZE;
132  }
133  if ( currentOffset == expectedOffset ) {
134  list.add( rs.getBoolean( "ismissing" ) );
135  expectedOffset += FileChunk.CHUNK_SIZE;
136  }
137  }
138  return list;
139  } catch ( SQLException e ) {
140  LOGGER.error( "Query failed in DbImageBlock.getBlockStatuses()", e );
141  throw e;
142  }
143  }
144 
145  public static List<ByteBuffer> getBlockHashes( String imageVersionId ) throws SQLException
146  {
147  try ( MysqlConnection connection = Database.getConnection() ) {
148  return getBlockHashes( connection, imageVersionId );
149  } catch ( SQLException e ) {
150  LOGGER.error( "Query failed in DbImageBlock.getBlockHashes()", e );
151  throw e;
152  }
153  }
154 
155  private static List<ByteBuffer> getBlockHashes( MysqlConnection connection, String imageVersionId )
156  throws SQLException
157  {
158  MysqlStatement stmt = connection.prepareStatement( "SELECT startbyte, blocksha1 FROM imageblock"
159  + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC" );
160  stmt.setString( "imageversionid", imageVersionId );
161  ResultSet rs = stmt.executeQuery();
162  List<ByteBuffer> list = new ArrayList<>();
163  long expectedOffset = 0;
164  while ( rs.next() ) {
165  long currentOffset = rs.getLong( "startbyte" );
166  if ( currentOffset < expectedOffset )
167  continue;
168  while ( currentOffset > expectedOffset ) {
169  list.add( null );
170  expectedOffset += FileChunk.CHUNK_SIZE;
171  }
172  if ( currentOffset == expectedOffset ) {
173  list.add( ByteBuffer.wrap( rs.getBytes( "blocksha1" ) ) );
174  expectedOffset += FileChunk.CHUNK_SIZE;
175  }
176  }
177  return list;
178  }
179 
180 }
static List< Boolean > getMissingStatusList(String imageVersionId)
static void insertChunkList(String imageVersionId, List< FileChunk > all, boolean missing)
static MysqlConnection getConnection()
Get a connection to the database.
Definition: Database.java:92
ResultSet executeQuery()
Executes the statement, which must be a query.
int executeUpdate()
Executes the statement, which must be an SQL INSERT, UPDATE or DELETE statement; or an SQL statement ...
void setInt(String name, int value)
Sets a parameter.
void setBoolean(String name, boolean value)
Sets a parameter.
ChunkUpdate(String imageVersionId, FileRange range, boolean isMissing)
void setString(String name, String value)
Sets a parameter.
final ArrayBlockingQueue< ChunkUpdate > queue
Class for creating PreparedStatements with named parameters.
void setBinary(String name, byte[] value)
Sets a parameter.
static List< ByteBuffer > getBlockHashes(MysqlConnection connection, String imageVersionId)
static void asyncUpdate(String imageVersionId, FileChunk chunk)
static List< ByteBuffer > getBlockHashes(String imageVersionId)
void setLong(String name, long value)
Sets a parameter.