1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activeio.journal.active;
19
20 import java.io.DataInput;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.File;
25 import java.io.IOException;
26 import java.nio.ByteBuffer;
27 import java.text.NumberFormat;
28
29 import org.activeio.Packet;
30 import org.activeio.adapter.PacketInputStream;
31 import org.activeio.adapter.PacketOutputStream;
32 import org.activeio.journal.InvalidRecordLocationException;
33 import org.activeio.packet.ByteArrayPacket;
34 import org.activeio.packet.ByteBufferPacket;
35
36 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
37
38 /***
39 * Provides a logical view of many seperate files as one single long log file.
40 * The seperate files that compose the LogFile are Segements of the LogFile.
41 * <p/>This class is not thread safe.
42 *
43 * @version $Revision: 1.1 $
44 */
45 final public class LogFileManager {
46
47 public static final int DEFAULT_LOGFILE_COUNT = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultLogFileCount", ""+(2)));
48 public static final int DEFAULT_LOGFILE_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultLogFileSize", ""+(1024*1024*20)));
49
50 static final public int SERIALIZED_SIZE = 6+Location.SERIALIZED_SIZE;
51
52 static final public byte DATA_RECORD_TYPE = 1;
53 static final public byte MARK_RECORD_TYPE = 2;
54 static final private NumberFormat onlineLogNameFormat = NumberFormat.getNumberInstance();
55 static {
56 onlineLogNameFormat.setMinimumIntegerDigits(3);
57 onlineLogNameFormat.setMaximumIntegerDigits(3);
58 onlineLogNameFormat.setGroupingUsed(false);
59 onlineLogNameFormat.setParseIntegerOnly(true);
60 onlineLogNameFormat.setMaximumFractionDigits(0);
61 }
62
63
64 private final File logDirectory;
65 private final int initialLogFileSize;
66 private final int onlineLogFileCount;
67 private final SynchronizedInt activeLogFileCount = new SynchronizedInt(0);
68
69
70 private LogFileNode firstNode;
71 private LogFileNode firstActiveNode;
72 private LogFileNode firstInactiveNode;
73 private LogFileNode appendNode;
74
75 private ControlFile controlFile;
76 private int lastLogFileId = -1;
77 private Location lastMark;
78 private boolean disposed;
79 private boolean loadedFromCleanShutDown;
80
81
82 public LogFileManager(File logDirectory) throws IOException {
83 this(logDirectory, DEFAULT_LOGFILE_COUNT, DEFAULT_LOGFILE_SIZE);
84 }
85
86 public LogFileManager(File logDirectory, int onlineLogFileCount, int initialLogFileSize) throws IOException {
87 this.logDirectory = logDirectory;
88 this.onlineLogFileCount = onlineLogFileCount;
89 this.initialLogFileSize = initialLogFileSize;
90 initialize(onlineLogFileCount);
91 }
92
93 void initialize(int onlineLogFileCount) throws IOException {
94
95 LogFileNode logFiles[] = new LogFileNode[onlineLogFileCount];
96
97
98 if (!logDirectory.exists()) {
99 if (!logDirectory.mkdirs()) {
100 throw new IOException("Could not create directory: " + logDirectory);
101 }
102 }
103
104
105 int controlDataSize = SERIALIZED_SIZE + (LogFileNode.SERIALIZED_SIZE*onlineLogFileCount);
106 controlFile = new ControlFile(new File(logDirectory, "control.dat"), controlDataSize);
107
108 controlFile.lock();
109
110
111 for (int i = 0; i < onlineLogFileCount; i++) {
112 LogFile file = new LogFile(new File(logDirectory, "log-" + onlineLogNameFormat.format(i) + ".dat"),
113 initialLogFileSize);
114 logFiles[i] = new LogFileNode(file);
115 }
116
117
118 for (int i = 0; i < onlineLogFileCount; i++) {
119 if (i == (onlineLogFileCount - 1)) {
120 logFiles[i].setNext(logFiles[0]);
121 } else {
122 logFiles[i].setNext(logFiles[i + 1]);
123 }
124 }
125
126 firstNode = logFiles[0];
127 loadState();
128
129
130 for (int i = 0; i < onlineLogFileCount; i++) {
131 if( logFiles[i].isActive() ) {
132 if( firstActiveNode == null || logFiles[i].getId() < firstActiveNode.getId() ) {
133 firstActiveNode = logFiles[i];
134 }
135 }
136 }
137
138
139 if ( firstActiveNode == null ) {
140 firstInactiveNode = logFiles[0];
141 activateNextLogFile();
142 } else {
143
144 firstInactiveNode = null;
145 LogFileNode log = firstActiveNode;
146 do {
147 if( !log.isActive() ) {
148 firstInactiveNode = log;
149 break;
150 } else {
151 appendNode = log;
152 }
153 log = log.getNext();
154 } while (log != firstActiveNode);
155 }
156
157
158
159 if( !this.loadedFromCleanShutDown ) {
160 checkAppendLog();
161 }
162
163 loadedFromCleanShutDown = false;
164 storeState();
165 }
166
167 private void checkAppendLog() throws IOException {
168
169
170
171
172 int offset = 0;
173 Record record = new Record();
174 LogFile logFile = appendNode.getLogFile();
175 Location markLocation=null;
176
177 while( logFile.loadAndCheckRecord(offset, record) ) {
178
179 if( record.getLocation().getLogFileId()!= appendNode.getId() || record.getLocation().getLogFileOffset()!=offset ) {
180
181 break;
182 }
183
184 if ( record.getRecordType()==LogFileManager.MARK_RECORD_TYPE) {
185 markLocation = record.getLocation();
186 }
187
188 offset += record.getRecordLength();
189 }
190
191 appendNode.setAppendOffset(offset);
192
193 if( markLocation!=null ) {
194 try {
195 Packet packet = readPacket(markLocation);
196 markLocation = Location.readFromPacket(packet);
197 } catch (InvalidRecordLocationException e) {
198 throw (IOException)new IOException(e.getMessage()).initCause(e);
199 }
200 updateMark(markLocation);
201 }
202
203 }
204
205 private void storeState() throws IOException {
206 Packet controlData = controlFile.getControlData();
207 if( controlData.remaining() == 0 )
208 return;
209
210 DataOutput data = new DataOutputStream(new PacketOutputStream(controlData));
211
212 data.writeInt(lastLogFileId);
213 data.writeBoolean(lastMark!=null);
214 if( lastMark!=null )
215 lastMark.writeToDataOutput(data);
216 data.writeBoolean(loadedFromCleanShutDown);
217
218
219 LogFileNode log = firstNode;
220 do {
221 log.writeExternal( data );
222 log = log.getNext();
223 } while (log != firstNode);
224
225 controlFile.store();
226 }
227
228 private void loadState() throws IOException {
229 if( controlFile.load() ) {
230 Packet controlData = controlFile.getControlData();
231 if( controlData.remaining() == 0 )
232 return;
233
234 DataInput data = new DataInputStream(new PacketInputStream(controlData));
235
236 lastLogFileId =data.readInt();
237 if( data.readBoolean() )
238 lastMark = Location.readFromDataInput(data);
239 else
240 lastMark = null;
241 loadedFromCleanShutDown = data.readBoolean();
242
243
244 LogFileNode log = firstNode;
245 do {
246 log.readExternal( data );
247 log = log.getNext();
248 } while (log != firstNode);
249 }
250 }
251
252 public void dispose() {
253
254 if (disposed)
255 return;
256 this.disposed = true;
257
258 try {
259
260 LogFileNode log = firstNode;
261 do {
262 log.getLogFile().dispose();
263 log = log.getNext();
264 } while (log != firstNode);
265
266 loadedFromCleanShutDown=true;
267 storeState();
268 controlFile.dispose();
269 } catch ( IOException e ) {
270 }
271
272 }
273
274 private int getNextLogFileId() {
275 return ++lastLogFileId;
276 }
277
278 /***
279 * @param write
280 * @throws IOException
281 */
282 public void append(BatchedWrite write) throws IOException {
283
284 if (!appendNode.isActive())
285 throw new IllegalStateException("Log file is not active. Writes are not allowed");
286 if (appendNode.isReadOnly())
287 throw new IllegalStateException("Log file has been marked Read Only. Writes are not allowed");
288
289
290 LogFile logFile = appendNode.getLogFile();
291 ByteBuffer buffer = ((ByteBufferPacket)write.getPacket().narrow(ByteBufferPacket.class)).getByteBuffer();
292 int size = buffer.remaining();
293 logFile.write(appendNode.getAppendOffset(), buffer);
294 if( write.getForce() )
295 logFile.force();
296
297
298 appendNode.appended(size);
299 if (write.getMark() != null) {
300 updateMark(write.getMark());
301 }
302 }
303
304 /***
305 * @param write
306 * @throws IOException
307 */
308 synchronized private void updateMark(Location mark) throws IOException {
309
310 this.lastMark = mark;
311 while (firstActiveNode != appendNode) {
312 if (firstActiveNode.getId() < lastMark.getLogFileId()) {
313 firstActiveNode.setActive(false);
314 activeLogFileCount.decrement();
315 if( firstInactiveNode ==null )
316 firstInactiveNode = firstActiveNode;
317 firstActiveNode = firstActiveNode.getNextActive();
318 } else {
319 break;
320 }
321 }
322 }
323
324 RecordInfo readRecordInfo(Location location) throws IOException, InvalidRecordLocationException {
325
326 LogFileNode logFileState = getLogFileWithId(location.getLogFileId());
327
328 if (logFileState.getAppendOffset() == location.getLogFileOffset()) {
329 throw new InvalidRecordLocationException("No record at (" + location
330 + ") found. Location past end of logged data.");
331 }
332
333
334 try {
335 LogFile logFile = logFileState.getLogFile();
336 Record header = new Record();
337 logFile.readRecordHeader(location.getLogFileOffset(), header);
338 return new RecordInfo(location, header, logFileState);
339 } catch (IOException e) {
340 throw new InvalidRecordLocationException("No record at (" + location + ") found.");
341 }
342 }
343
344 LogFileNode getLogFileWithId(int logFileId) throws InvalidRecordLocationException {
345 for (LogFileNode lf = firstActiveNode; lf != null; lf = lf.getNextActive()) {
346 if (lf.getId() == logFileId) {
347 return lf;
348 }
349
350
351 if (logFileId < lf.getId())
352 break;
353 }
354 throw new InvalidRecordLocationException("Log file with id;" + logFileId + " is not active.");
355 }
356
357 /***
358 * @param lastLocation
359 * @return
360 */
361 public Location getNextDataRecordLocation(Location lastLocation) throws IOException, InvalidRecordLocationException {
362 RecordInfo ri = readRecordInfo(lastLocation);
363 while (true) {
364
365 int logFileId = ri.getLocation().getLogFileId();
366 int offset = ri.getNextLocation();
367
368
369 if (offset >= ri.getLogFileState().getAppendOffset()) {
370 LogFileNode nextActive = ri.getLogFileState().getNextActive();
371 if (nextActive == null) {
372 return null;
373 }
374 logFileId = nextActive.getId();
375 offset = 0;
376 }
377
378 try {
379 ri = readRecordInfo(new Location(logFileId, offset));
380 } catch (InvalidRecordLocationException e) {
381 return null;
382 }
383
384
385 if (ri.getHeader().getRecordType() == DATA_RECORD_TYPE) {
386 return ri.getLocation();
387 }
388
389 }
390 }
391
392 /***
393 * @param logFileIndex
394 * @param logFileOffset
395 * @return
396 * @throws IOException
397 * @throws InvalidRecordLocationException
398 */
399 public Packet readPacket(Location location) throws IOException, InvalidRecordLocationException {
400
401
402 RecordInfo recordInfo = readRecordInfo(location);
403
404 byte data[] = new byte[recordInfo.getHeader().getPayloadLength()];
405
406 LogFile logFile = recordInfo.getLogFileState().getLogFile();
407 logFile.read(recordInfo.getDataOffset(), data);
408
409 return new ByteArrayPacket(data);
410
411 }
412
413 public int getInitialLogFileSize() {
414 return initialLogFileSize;
415 }
416
417 public Location getFirstActiveLogLocation() {
418 if (firstActiveNode == null)
419 return null;
420 if (firstActiveNode.getAppendOffset() == 0)
421 return null;
422 return new Location(firstActiveNode.getId(), 0);
423 }
424
425 void activateNextLogFile() throws IOException {
426
427
428 if (appendNode != null) {
429 appendNode.setReadOnly(true);
430 }
431
432 LogFileNode next = firstInactiveNode;
433 synchronized (this) {
434 firstInactiveNode = firstInactiveNode.getNextInactive();
435 next.activate(getNextLogFileId());
436 if (firstActiveNode == null) {
437 firstActiveNode = next;
438 }
439 }
440 activeLogFileCount.increment();
441 appendNode = next;
442
443 storeState();
444 }
445
446 /***
447 * @return Returns the logDirectory.
448 */
449 public File getLogDirectory() {
450 return logDirectory;
451 }
452
453 /***
454 * @return Returns the lastMark.
455 */
456 public Location getLastMarkedRecordLocation() {
457 return lastMark;
458 }
459
460 public Location getNextAppendLocation() {
461 return new Location(appendNode.getId(), appendNode.getAppendOffset());
462 }
463
464 /***
465 * @return Returns the onlineLogFileCount.
466 */
467 public int getOnlineLogFileCount() {
468 return onlineLogFileCount;
469 }
470
471 public boolean isPastHalfActive() {
472 return (onlineLogFileCount/2.f) < activeLogFileCount.get();
473 }
474
475 synchronized public Location getFirstRecordLocationOfSecondActiveLogFile() {
476 return firstActiveNode.getNextActive().getFirstRecordLocation();
477 }
478
479 synchronized public boolean canActivateNextLogFile() {
480 return firstInactiveNode!=null;
481 }
482
483 }