001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.store.jdbc.adapter; 019 020 import org.activemq.store.jdbc.StatementProvider; 021 022 023 /** 024 * @version $Revision: 1.1 $ 025 */ 026 public class DefaultStatementProvider implements StatementProvider { 027 028 protected String tablePrefix = ""; 029 protected String messageTableName = "ACTIVEMQ_MSGS"; 030 protected String txTableName = "ACTIVEMQ_TXS"; 031 protected String durableSubAcksTableName = "ACTIVEMQ_ACKS"; 032 033 protected String binaryDataType = "BLOB"; 034 protected String containerNameDataType = "VARCHAR(250)"; 035 protected String xidDataType = "VARCHAR(250)"; 036 protected String msgIdDataType = "VARCHAR(250)"; 037 protected String subscriptionIdDataType = "VARCHAR(250)"; 038 protected String sequenceDataType = "INTEGER"; 039 protected String longDataType = "BIGINT"; 040 protected String stringIdDataType = "VARCHAR(250)"; 041 042 043 public String [] getCreateSchemaStatments() { 044 return new String[]{ 045 "CREATE TABLE "+tablePrefix+messageTableName+"(" 046 +"ID "+sequenceDataType+" NOT NULL" 047 +", CONTAINER "+containerNameDataType 048 +", MSGID "+msgIdDataType 049 +", MSG "+binaryDataType 050 +", PRIMARY KEY ( ID ) )", 051 "CREATE INDEX "+tablePrefix+messageTableName+"_MIDX ON "+tablePrefix+messageTableName+" (MSGID)", 052 "CREATE INDEX "+tablePrefix+messageTableName+"_CIDX ON "+tablePrefix+messageTableName+" (CONTAINER)", 053 054 "CREATE TABLE "+tablePrefix+txTableName+"(" 055 +"XID "+xidDataType+" NOT NULL" 056 +", PRIMARY KEY ( XID ))", 057 058 "CREATE TABLE "+tablePrefix+durableSubAcksTableName+"(" 059 +"SUB "+subscriptionIdDataType+" NOT NULL" 060 +", CONTAINER "+containerNameDataType+" NOT NULL" 061 +", LAST_ACKED_ID "+sequenceDataType 062 +", SE_ID INTEGER" 063 +", SE_CLIENT_ID "+stringIdDataType 064 +", SE_CONSUMER_NAME "+stringIdDataType 065 +", SE_SELECTOR "+stringIdDataType 066 +", PRIMARY KEY ( SUB, CONTAINER ))", 067 "CREATE INDEX "+tablePrefix+durableSubAcksTableName+"_CIDX ON "+tablePrefix+durableSubAcksTableName+" (CONTAINER)", 068 "ALTER TABLE "+tablePrefix+messageTableName+" ADD EXPIRATION "+longDataType, 069 }; 070 } 071 072 public String [] getDropSchemaStatments() { 073 return new String[]{ 074 "DROP TABLE "+tablePrefix+durableSubAcksTableName+"", 075 "DROP TABLE "+tablePrefix+messageTableName+"", 076 "DROP TABLE "+tablePrefix+txTableName+"" 077 }; 078 } 079 080 public String getAddMessageStatment() { 081 return "INSERT INTO "+tablePrefix+messageTableName+"(ID, CONTAINER, MSGID, MSG, EXPIRATION) VALUES (?, ?, ?, ?, ?)"; 082 } 083 public String getUpdateMessageStatment() { 084 return "UPDATE "+tablePrefix+messageTableName+" SET MSG=? WHERE ID=?"; 085 } 086 public String getRemoveMessageStatment() { 087 return "DELETE FROM "+tablePrefix+messageTableName+" WHERE ID=?"; 088 } 089 public String getFindMessageSequenceIdStatment() { 090 return "SELECT ID FROM "+tablePrefix+messageTableName+" WHERE MSGID=?"; 091 } 092 public String getFindMessageStatment() { 093 return "SELECT MSG FROM "+tablePrefix+messageTableName+" WHERE ID=?"; 094 } 095 public String getFindAllMessagesStatment() { 096 return "SELECT ID, MSGID FROM "+tablePrefix+messageTableName+" WHERE CONTAINER=? ORDER BY ID"; 097 } 098 public String getFindLastSequenceIdInMsgs() { 099 return "SELECT MAX(ID) FROM "+tablePrefix+messageTableName; 100 } 101 public String getFindLastSequenceIdInAcks() { 102 return "SELECT MAX(LAST_ACKED_ID) FROM "+tablePrefix+durableSubAcksTableName; 103 } 104 105 public String getAddXidStatment() { 106 return "INSERT INTO "+tablePrefix+txTableName+"(XID) VALUES (?)"; 107 } 108 public String getRemoveXidStatment() { 109 return "DELETE FROM "+tablePrefix+txTableName+" WHERE XID=?"; 110 } 111 public String getFindAllXidStatment() { 112 return "SELECT XID FROM "+tablePrefix+txTableName+""; 113 } 114 115 public String getCreateDurableSubStatment() { 116 return "INSERT INTO "+tablePrefix+durableSubAcksTableName 117 +"(SE_ID, SE_CLIENT_ID, SE_CONSUMER_NAME, SE_SELECTOR, SUB, CONTAINER, LAST_ACKED_ID) " 118 +"VALUES (?, ?, ?, ?, ?, ?, ?)"; 119 } 120 121 public String getUpdateDurableSubStatment() { 122 return "UPDATE "+tablePrefix+durableSubAcksTableName 123 +" SET SE_ID=?, SE_CLIENT_ID=?, SE_CONSUMER_NAME=?, SE_SELECTOR=? WHERE SUB=? AND CONTAINER=?"; 124 } 125 126 public String getFindDurableSubStatment() { 127 return "SELECT SE_ID, SE_CLIENT_ID, SE_CONSUMER_NAME, SE_SELECTOR, CONTAINER=? "+tablePrefix+durableSubAcksTableName 128 +" WHERE SUB=? AND CONTAINER=?"; 129 } 130 131 public String getUpdateLastAckOfDurableSub() { 132 return "UPDATE "+tablePrefix+durableSubAcksTableName 133 +" SET LAST_ACKED_ID=? WHERE SUB=? AND CONTAINER=?"; 134 } 135 136 public String getDeleteSubscriptionStatment() { 137 return "DELETE FROM "+tablePrefix+durableSubAcksTableName 138 +" WHERE SUB=? AND CONTAINER=?"; 139 } 140 141 public String getFindAllDurableSubMessagesStatment() { 142 return "SELECT M.ID, M.MSGID FROM " 143 +tablePrefix+messageTableName+" M, " 144 +tablePrefix+durableSubAcksTableName +" D " 145 +" WHERE D.CONTAINER=? AND D.SUB=? " 146 +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 147 +" ORDER BY M.ID"; 148 } 149 150 151 public String getRemoveAllMessagesStatment() { 152 return "DELETE FROM "+tablePrefix+messageTableName+" WHERE CONTAINER=?"; 153 } 154 155 public String getRemoveAllSubscriptionsStatment() { 156 return "DELETE FROM "+tablePrefix+durableSubAcksTableName+" WHERE CONTAINER=?"; 157 } 158 159 public String getDeleteOldMessagesStatment() { 160 return "DELETE FROM "+tablePrefix+messageTableName+ 161 " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + 162 "( SELECT MIN("+tablePrefix+durableSubAcksTableName+".LAST_ACKED_ID) " + 163 "FROM "+tablePrefix+durableSubAcksTableName+" WHERE " + 164 tablePrefix+durableSubAcksTableName+".CONTAINER="+tablePrefix+messageTableName+ 165 ".CONTAINER)"; 166 } 167 168 169 /** 170 * @return Returns the containerNameDataType. 171 */ 172 public String getContainerNameDataType() { 173 return containerNameDataType; 174 } 175 /** 176 * @param containerNameDataType The containerNameDataType to set. 177 */ 178 public void setContainerNameDataType(String containerNameDataType) { 179 this.containerNameDataType = containerNameDataType; 180 } 181 /** 182 * @return Returns the messageDataType. 183 */ 184 public String getBinaryDataType() { 185 return binaryDataType; 186 } 187 /** 188 * @param messageDataType The messageDataType to set. 189 */ 190 public void setBinaryDataType(String messageDataType) { 191 this.binaryDataType = messageDataType; 192 } 193 /** 194 * @return Returns the messageTableName. 195 */ 196 public String getMessageTableName() { 197 return messageTableName; 198 } 199 /** 200 * @param messageTableName The messageTableName to set. 201 */ 202 public void setMessageTableName(String messageTableName) { 203 this.messageTableName = messageTableName; 204 } 205 /** 206 * @return Returns the msgIdDataType. 207 */ 208 public String getMsgIdDataType() { 209 return msgIdDataType; 210 } 211 /** 212 * @param msgIdDataType The msgIdDataType to set. 213 */ 214 public void setMsgIdDataType(String msgIdDataType) { 215 this.msgIdDataType = msgIdDataType; 216 } 217 /** 218 * @return Returns the sequenceDataType. 219 */ 220 public String getSequenceDataType() { 221 return sequenceDataType; 222 } 223 /** 224 * @param sequenceDataType The sequenceDataType to set. 225 */ 226 public void setSequenceDataType(String sequenceDataType) { 227 this.sequenceDataType = sequenceDataType; 228 } 229 /** 230 * @return Returns the tablePrefix. 231 */ 232 public String getTablePrefix() { 233 return tablePrefix; 234 } 235 /** 236 * @param tablePrefix The tablePrefix to set. 237 */ 238 public void setTablePrefix(String tablePrefix) { 239 this.tablePrefix = tablePrefix; 240 } 241 /** 242 * @return Returns the txTableName. 243 */ 244 public String getTxTableName() { 245 return txTableName; 246 } 247 /** 248 * @param txTableName The txTableName to set. 249 */ 250 public void setTxTableName(String txTableName) { 251 this.txTableName = txTableName; 252 } 253 /** 254 * @return Returns the xidDataType. 255 */ 256 public String getXidDataType() { 257 return xidDataType; 258 } 259 /** 260 * @param xidDataType The xidDataType to set. 261 */ 262 public void setXidDataType(String xidDataType) { 263 this.xidDataType = xidDataType; 264 } 265 /** 266 * @return Returns the durableSubAcksTableName. 267 */ 268 public String getDurableSubAcksTableName() { 269 return durableSubAcksTableName; 270 } 271 /** 272 * @param durableSubAcksTableName The durableSubAcksTableName to set. 273 */ 274 public void setDurableSubAcksTableName(String durableSubAcksTableName) { 275 this.durableSubAcksTableName = durableSubAcksTableName; 276 } 277 /** 278 * @return Returns the subscriptionIdDataType. 279 */ 280 public String getSubscriptionIdDataType() { 281 return subscriptionIdDataType; 282 } 283 /** 284 * @param subscriptionIdDataType The subscriptionIdDataType to set. 285 */ 286 public void setSubscriptionIdDataType(String subscriptionIdDataType) { 287 this.subscriptionIdDataType = subscriptionIdDataType; 288 } 289 290 public String getLongDataType() { 291 return longDataType; 292 } 293 294 public void setLongDataType(String longDataType) { 295 this.longDataType = longDataType; 296 } 297 298 public String getStringIdDataType() { 299 return stringIdDataType; 300 } 301 302 public void setStringIdDataType(String stringIdDataType) { 303 this.stringIdDataType = stringIdDataType; 304 } 305 306 }