1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activeio.journal;
19  
20  import java.io.IOException;
21  import java.io.PrintWriter;
22  import java.io.StringWriter;
23  
24  import org.activeio.Packet;
25  import org.activeio.stats.CountStatisticImpl;
26  import org.activeio.stats.IndentPrinter;
27  import org.activeio.stats.TimeStatisticImpl;
28  
29  /***
30   * A Journal filter that captures performance statistics of the filtered Journal.
31   * 
32   * @version $Revision: 1.1 $
33   */
34  public class JournalStatsFilter implements Journal {
35  	
36  	private final TimeStatisticImpl writeLatency = new TimeStatisticImpl("writeLatency", "The amount of time that is spent waiting for a record to be written to the Journal"); 
37  	private final CountStatisticImpl writeRecordsCounter = new CountStatisticImpl("writeRecordsCounter","The number of records that have been written by the Journal");
38  	private final CountStatisticImpl writeBytesCounter = new CountStatisticImpl("writeBytesCounter","The number of bytes that have been written by the Journal");
39  	private final TimeStatisticImpl synchedWriteLatency = new TimeStatisticImpl(writeLatency, "synchedWriteLatency", "The amount of time that is spent waiting for a synch record to be written to the Journal"); 
40  	private final TimeStatisticImpl unsynchedWriteLatency = new TimeStatisticImpl(writeLatency, "unsynchedWriteLatency", "The amount of time that is spent waiting for a non synch record to be written to the Journal"); 	
41  	private final TimeStatisticImpl readLatency = new TimeStatisticImpl("readLatency", "The amount of time that is spent waiting for a record to be read from the Journal"); 
42  	private final CountStatisticImpl readBytesCounter = new CountStatisticImpl("readBytesCounter","The number of bytes that have been read by the Journal");
43  	
44  	private final Journal next;
45  	private boolean detailedStats;
46  
47  	
48  	/***
49  	 * Creates a JournalStatsFilter that captures performance information of <code>next</next>. 
50  	 * @param next
51  	 */
52  	public JournalStatsFilter(Journal next) {
53  		this.next = next;
54  	}
55  	
56  	/***
57  	 * @see org.codehaus.activemq.journal.Journal#write(byte[], boolean)
58  	 */
59  	public RecordLocation write(Packet data, boolean sync) throws IOException {
60  		//writeWaitTimeStat
61  		long start = System.currentTimeMillis();
62  		RecordLocation answer = next.write(data, sync);
63  		long end = System.currentTimeMillis();
64  		
65  		writeRecordsCounter.increment();
66  		writeBytesCounter.add(data.remaining());
67  		if( sync )
68  			synchedWriteLatency.addTime(end-start);
69  		else 
70  			unsynchedWriteLatency.addTime(end-start);
71  		return answer;
72  	}
73  
74  	/***
75  	 * @see org.codehaus.activemq.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
76  	 */
77  	public Packet read(RecordLocation location)
78  			throws InvalidRecordLocationException, IOException {
79  		
80  		long start = System.currentTimeMillis();
81  		Packet answer = next.read(location);		
82  		long end = System.currentTimeMillis();
83  		
84  		readBytesCounter.add(answer.remaining());
85  		readLatency.addTime(end-start);
86  		return answer;
87  	}
88  
89  	/***
90  	 * @see org.codehaus.activemq.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
91  	 */
92  	public void setMark(RecordLocation recordLocator, boolean force)
93  			throws InvalidRecordLocationException, IOException {
94  		next.setMark(recordLocator, force);
95  	}
96  
97  	/***
98  	 * @see org.codehaus.activemq.journal.Journal#getMark()
99  	 */
100 	public RecordLocation getMark() {
101 		return next.getMark();
102 	}
103 
104 	/***
105 	 * @see org.codehaus.activemq.journal.Journal#close()
106 	 */
107 	public void close() throws IOException {
108 		next.close();
109 	}
110 	
111 	/***
112 	 * @see org.codehaus.activemq.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
113 	 */
114 	public void setJournalEventListener(JournalEventListener eventListener) {
115 	    next.setJournalEventListener(eventListener);
116 	}
117 
118 	/***
119 	 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
120 	 */
121 	public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
122 			throws IOException, InvalidRecordLocationException {		
123 		return next.getNextRecordLocation(lastLocation);
124 	}
125 	
126 	/***
127 	 * Writes the gathered statistics to the <code>out</code> object.
128 	 * 
129 	 * @param out
130 	 */
131     public void dump(IndentPrinter out) {
132         out.printIndent();
133         out.println("Journal Stats {");        
134         out.incrementIndent();
135         out.printIndent();
136         out.println("Throughput           : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s" );
137         out.printIndent();
138         out.println("Latency with force   : "+ getAvgSyncedLatencyMs() +" ms"  );
139         out.printIndent();
140         out.println("Latency without force: "+ getAvgUnSyncedLatencyMs() +" ms"  );
141 
142         out.printIndent();
143         out.println("Raw Stats {");
144         out.incrementIndent();
145                 
146         out.printIndent();
147         out.println(writeRecordsCounter);
148         out.printIndent();
149         out.println(writeBytesCounter);
150         out.printIndent();
151         out.println(writeLatency);
152         out.incrementIndent();
153         out.printIndent();
154         out.println(synchedWriteLatency);
155         out.printIndent();
156         out.println(unsynchedWriteLatency);
157         out.decrementIndent();
158 
159         out.printIndent();
160         out.println(readBytesCounter);
161         
162         out.printIndent();
163         out.println(readLatency);        
164         out.decrementIndent();
165         out.printIndent();
166         out.println("}");
167         
168         out.decrementIndent();
169         out.printIndent();
170         out.println("}");
171 
172     }
173 
174     /***
175      * Dumps the stats to a String.
176      * 
177      * @see java.lang.Object#toString()
178      */
179 	public String toString() {
180 		if( detailedStats ) {
181 			StringWriter w = new StringWriter();
182 			PrintWriter pw = new PrintWriter(w);		
183 			dump(new IndentPrinter(pw, "  "));
184 			return w.getBuffer().toString();
185 		} else {
186 			StringWriter w = new StringWriter();
187 			PrintWriter pw = new PrintWriter(w);
188 			IndentPrinter out = new IndentPrinter(pw, "  ");
189 	        out.println("Throughput           : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s");
190 	        out.printIndent();
191 	        out.println("Latency with force   : "+getAvgSyncedLatencyMs()+" ms"  );
192 	        out.printIndent();
193 	        out.println("Latency without force: "+getAvgUnSyncedLatencyMs()+" ms"  );
194 			return w.getBuffer().toString();			
195 		}
196     }
197 
198 	/***
199 	 * @param detailedStats true if details stats should be displayed by <code>toString()</code> and <code>dump</code>
200 	 * @return
201 	 */
202 	public JournalStatsFilter enableDetailedStats(boolean detailedStats) {		
203 		this.detailedStats = detailedStats;
204 		return this;
205 	}
206 
207 	/***
208 	 * Gets the average throughput in k/s.
209 	 * 
210 	 * @return the average throughput in k/s.
211 	 */
212 	public double getThroughputKps() {
213 		 long totalTime = writeBytesCounter.getLastSampleTime()-writeBytesCounter.getStartTime(); 
214 		 return (((double)writeBytesCounter.getCount()/(double)totalTime)/(double)1024)*1000;
215 	}
216 
217 	/***
218 	 * Gets the average throughput in records/s.
219 	 * 
220 	 * @return the average throughput in records/s.
221 	 */
222 	public double getThroughputRps() {
223 		 long totalTime = writeRecordsCounter.getLastSampleTime()-writeRecordsCounter.getStartTime(); 
224 		 return (((double)writeRecordsCounter.getCount()/(double)totalTime))*1000;
225 	}
226 
227 	/***
228 	 * Gets the average number of writes done per second
229 	 * 
230 	 * @return the average number of writes in w/s.
231 	 */
232 	public double getWritesPerSecond() {
233 		 return writeLatency.getAveragePerSecond();
234 	}
235 
236 	/***
237 	 * Gets the average sync write latency in ms.
238 	 * 
239 	 * @return the average sync write latency in ms.
240 	 */
241 	public double getAvgSyncedLatencyMs() {
242 		return synchedWriteLatency.getAverageTime();
243 	}
244 
245 	/***
246 	 * Gets the average non sync write latency in ms.
247 	 * 
248 	 * @return the average non sync write latency in ms.
249 	 */
250 	public double getAvgUnSyncedLatencyMs() {
251 		return unsynchedWriteLatency.getAverageTime();
252 	}
253 	
254 	/***
255 	 * Resets the stats sample.
256 	 */
257 	public void reset() {
258 		writeLatency.reset(); 
259 		writeBytesCounter.reset();
260 		writeRecordsCounter.reset();
261 		synchedWriteLatency.reset(); 
262 		unsynchedWriteLatency.reset(); 	
263 		readLatency.reset(); 
264 		readBytesCounter.reset();
265 	}
266 }