View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activeio.journal.howl;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  
23  import org.activeio.Packet;
24  import org.activeio.journal.InvalidRecordLocationException;
25  import org.activeio.journal.Journal;
26  import org.activeio.journal.JournalEventListener;
27  import org.activeio.journal.RecordLocation;
28  import org.activeio.packet.ByteArrayPacket;
29  import org.objectweb.howl.log.Configuration;
30  import org.objectweb.howl.log.InvalidFileSetException;
31  import org.objectweb.howl.log.InvalidLogBufferException;
32  import org.objectweb.howl.log.InvalidLogKeyException;
33  import org.objectweb.howl.log.LogConfigurationException;
34  import org.objectweb.howl.log.LogEventListener;
35  import org.objectweb.howl.log.LogRecord;
36  import org.objectweb.howl.log.Logger;
37  
38  /***
39   * An implementation of the Journal interface using a HOWL logger.  This is is a thin
40   * wrapper around a HOWL logger.
41   * 
42   * This implementation can be used to write records but not to retreive them
43   * yet. Once the HOWL logger implements the methods needed to retreive
44   * previously stored records, this class can be completed.
45   * 
46   * @version $Revision: 1.2 $
47   */
48  public class HowlJournal implements Journal {
49  
50  	private final Logger logger;
51  
52  	private RecordLocation lastMark;
53  
54  	public HowlJournal(Configuration configuration)
55  			throws InvalidFileSetException, LogConfigurationException,
56  			InvalidLogBufferException, ClassNotFoundException, IOException,
57  			InterruptedException {
58  		this.logger = new Logger(configuration);
59  		this.logger.open();
60  		lastMark = new LongRecordLocation(logger.getActiveMark());
61  	}
62  
63  	/***
64  	 * @see org.activeio.journal.Journal#write(byte[], boolean)
65  	 */
66  	public RecordLocation write(Packet packet, boolean sync) throws IOException {
67  		try {
68  			return new LongRecordLocation(logger.put(packet.sliceAsBytes(), sync));
69  		} catch (InterruptedException e) {
70  			throw (InterruptedIOException) new InterruptedIOException()
71  					.initCause(e);
72  		} catch (IOException e) {
73  			throw e;
74  		} catch (Exception e) {
75  			throw (IOException) new IOException("Journal write failed: " + e)
76  					.initCause(e);
77  		}
78  	}
79  
80  	/***
81  	 * @see org.activeio.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
82  	 */
83  	public void setMark(RecordLocation recordLocator, boolean force)
84  			throws InvalidRecordLocationException, IOException {
85  		try {
86  			long location = toLong(recordLocator);
87  			logger.mark(location, force);
88  			lastMark = recordLocator;
89  
90  		} catch (InterruptedException e) {
91  			throw (InterruptedIOException) new InterruptedIOException()
92  					.initCause(e);
93  		} catch (IOException e) {
94  			throw e;
95  		} catch (InvalidLogKeyException e) {
96  			throw new InvalidRecordLocationException(e.getMessage(), e);
97  		} catch (Exception e) {
98  			throw (IOException) new IOException("Journal write failed: " + e)
99  					.initCause(e);
100 		}
101 	}
102 	
103 	/***
104      * @param recordLocator
105      * @return
106      * @throws InvalidRecordLocationException
107      */
108     private long toLong(RecordLocation recordLocator) throws InvalidRecordLocationException {
109         if (recordLocator == null
110         		|| recordLocator.getClass() != LongRecordLocation.class)
111         	throw new InvalidRecordLocationException();
112 
113         long location = ((LongRecordLocation) recordLocator)
114         		.getLongLocation();
115         return location;
116     }
117 
118     /***
119 	 * @see org.activeio.journal.Journal#getMark()
120 	 */
121 	public RecordLocation getMark() {
122 		return lastMark;
123 	}
124 
125 	/***
126 	 * @see org.activeio.journal.Journal#close()
127 	 */
128 	public void close() throws IOException {
129 		try {
130 			logger.close();
131 		} catch (IOException e) {
132 			throw e;
133 		} catch (InterruptedException e) {
134 			throw (InterruptedIOException) new InterruptedIOException()
135 					.initCause(e);
136 		} catch (Exception e) {
137 			throw (IOException) new IOException("Journal close failed: " + e)
138 					.initCause(e);
139 		}
140 	}
141 
142 	/***
143 	 * @see org.activeio.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
144 	 */
145 	public void setJournalEventListener(final JournalEventListener eventListener) {
146 		logger.setLogEventListener(new LogEventListener() {
147 			public void logOverflowNotification(long key) {
148 				eventListener.overflowNotification(new LongRecordLocation(key));
149 			}
150 		});
151 	}
152 
153 	/***
154 	 * @see org.activeio.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
155 	 */
156 	public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
157 			throws InvalidRecordLocationException {
158 	    
159 	    if( lastLocation ==null ) {
160 	        if( this.lastMark !=null ) {
161 	            lastLocation = lastMark;
162 	        } else {
163 	            return null;
164 	        }
165 	    }
166 	    
167 	    try {
168 	        while(true) {
169 	            LogRecord record = logger.get(null, toLong(lastLocation));
170 		        // I assume getNext will return null if there is no next record. 
171 	            LogRecord next = logger.getNext(record);
172 	            if( next==null || next.length == 0 )
173 	                return null;
174 	            lastLocation = new LongRecordLocation(next.key);
175 	            if( !next.isCTRL() )
176 	                return lastLocation;
177 	        }
178 		} catch (Exception e) {
179 			throw (InvalidRecordLocationException)new InvalidRecordLocationException().initCause(e);
180         }
181 		
182 	}
183 
184 	/***
185 	 * @see org.activeio.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
186 	 */
187 	public Packet read(RecordLocation location)
188 			throws InvalidRecordLocationException, IOException {
189 	    
190 	    try {
191             LogRecord record = logger.get(null, toLong(location));
192             return new ByteArrayPacket(record.data);            
193 		} catch (InvalidLogKeyException e) {
194 			throw new InvalidRecordLocationException(e.getMessage(), e);
195 		} catch (Exception e) {
196 			throw (IOException) new IOException("Journal write failed: " + e)
197 					.initCause(e);
198 		}
199 		
200 	}
201 
202 }