View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activeio.journal.active;
19  
20  import org.activeio.Packet;
21  
22  import EDU.oswego.cs.dl.util.concurrent.Latch;
23  
24  /***
25   * This contains all the data needed to write and force a list of records to a
26   * LogFile. The more records that can be cramed into a single BatchedWrite, the
27   * higher throughput that can be achived by a write and force operation.
28   * 
29   * @version $Revision: 1.1 $
30   */
31  final public class BatchedWrite {
32  
33      private final Packet packet;
34      public Throwable error;
35      private Location mark;
36      private boolean appendDisabled = false;
37      private boolean appendInProgress = false;
38      private Latch writeDoneLatch;
39  
40      /***
41       * @param packet
42       */
43      public BatchedWrite(Packet packet) {
44          this.packet = packet;
45      }
46  
47      /***
48       * @throws InterruptedException
49       * 
50       */
51      synchronized private void disableAppend() throws InterruptedException {
52          appendDisabled = true;
53          while (appendInProgress) {
54              wait();
55          }
56      }
57  
58      /***
59       * @param packet2
60       * @param mark2
61       * @return
62       */
63      public boolean append(Record record, Location recordMark, boolean force) {
64  
65          synchronized (this) {
66              if (appendDisabled)
67                  return false;
68              appendInProgress = true;
69          }
70          
71          
72          if( force && writeDoneLatch==null)
73              writeDoneLatch = new Latch();
74          
75          record.read(packet);
76  
77          // if we fit the record in this batch
78          if ( !record.hasRemaining() ) {
79              if (recordMark != null)
80                  mark = recordMark;
81          }
82  
83          synchronized (this) {
84              appendInProgress = false;
85              this.notify();
86  
87              if (appendDisabled)
88                  return false;
89              else
90                  return packet.remaining() > 0;
91          }
92      }
93  
94      public void waitForForce() throws Throwable {
95          if( writeDoneLatch!=null ) {
96              writeDoneLatch.acquire();
97              synchronized (this) {
98                  if (error != null)
99                      throw error;
100             }
101         }
102     }
103 
104     public void forced() {
105         if( writeDoneLatch!=null ) {
106             writeDoneLatch.release();
107         }
108     }
109 
110     public void writeFailed(Throwable error) {
111         if( writeDoneLatch!=null ) {
112             synchronized (this) {
113                 this.error = error;
114             }
115             writeDoneLatch.release();
116         }
117     }
118 
119     public Packet getPacket() {
120         return packet;
121     }
122 
123     /***
124      * @return
125      */
126     public Location getMark() {
127         return mark;
128     }
129 
130     /***
131      * @throws InterruptedException
132      * 
133      */
134     public void flip() throws InterruptedException {
135         disableAppend();
136         packet.flip();
137     }
138 
139     public boolean getForce() {
140         return writeDoneLatch!=null;
141     }
142 
143 }