diff --git a/Mycat-Core/src/main/java/io/mycat/backend/MySQLBackendConnection.java b/Mycat-Core/src/main/java/io/mycat/backend/MySQLBackendConnection.java index bfde8a1..41ade0a 100644 --- a/Mycat-Core/src/main/java/io/mycat/backend/MySQLBackendConnection.java +++ b/Mycat-Core/src/main/java/io/mycat/backend/MySQLBackendConnection.java @@ -1,261 +1,374 @@ -/* - * Copyright (c) 2016, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://code.google.com/p/opencloudb/. - * - */ -package io.mycat.backend; - -import java.io.IOException; -import java.nio.channels.SocketChannel; -import java.security.NoSuchAlgorithmException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.mycat.mysql.Capabilities; -import io.mycat.mysql.MySQLConnection; -import io.mycat.mysql.packet.AuthPacket; -import io.mycat.mysql.packet.HandshakePacket; -import io.mycat.mysql.packet.MySQLPacket; -import io.mycat.net2.Connection; -import io.mycat.util.SecurityUtil; -/** - * backend mysql connection - * @author wuzhihui - * - */ - -public class MySQLBackendConnection extends MySQLConnection { - - public static final Logger LOGGER = LoggerFactory.getLogger(MySQLBackendConnection.class); - private final boolean fromSlaveDB; - private BackConnectionCallback userCallback; - private MySQLDataSource dataSource; - private boolean borrowed; - private String schema; - private HandshakePacket handshake; - - private boolean authenticated; - - private long clientFlags; - - private long maxPacketSize = 16 * 1024 * 1024; - - private int charsetIndex; - - private MySQLDataSource datasource; - - public MySQLBackendConnection(MySQLDataSource datasource,SocketChannel channel) { - super(channel); - this.datasource=datasource; - this.fromSlaveDB=datasource.isSlaveNode(); - this.clientFlags = initClientFlags(); - } - - private static long initClientFlags() { - int flag = 0; - flag |= Capabilities.CLIENT_LONG_PASSWORD; - flag |= Capabilities.CLIENT_FOUND_ROWS; - flag |= Capabilities.CLIENT_LONG_FLAG; - flag |= Capabilities.CLIENT_CONNECT_WITH_DB; - // flag |= Capabilities.CLIENT_NO_SCHEMA; - boolean usingCompress = false; - if (usingCompress) { - flag |= Capabilities.CLIENT_COMPRESS; - } - flag |= Capabilities.CLIENT_ODBC; - flag |= Capabilities.CLIENT_LOCAL_FILES; - flag |= Capabilities.CLIENT_IGNORE_SPACE; - flag |= Capabilities.CLIENT_PROTOCOL_41; - flag |= Capabilities.CLIENT_INTERACTIVE; - // flag |= Capabilities.CLIENT_SSL; - flag |= Capabilities.CLIENT_IGNORE_SIGPIPE; - flag |= Capabilities.CLIENT_TRANSACTIONS; - // flag |= Capabilities.CLIENT_RESERVED; - flag |= Capabilities.CLIENT_SECURE_CONNECTION; - // client extension - flag |= Capabilities.CLIENT_MULTI_STATEMENTS; - flag |= Capabilities.CLIENT_MULTI_RESULTS; - return flag; - } - - public void setNextStatus(byte packetType) { - int status = this.getState(); - switch (status) { - case Connection.STATE_IDLE: - if (packetType == MySQLPacket.COM_QUERY) { - this.setState(RESULT_INIT_STATUS); - LOGGER.debug("DB status: Result Init"); - } else if (packetType == MySQLPacket.COM_QUIT) { - this.setState(Connection.STATE_IDLE); - } - break; - case RESULT_INIT_STATUS: - if (packetType == MySQLPacket.OK_PACKET) { - this.setState(Connection.STATE_IDLE); - LOGGER.debug("DB status: IDLE"); - } else if (packetType == MySQLPacket.ERROR_PACKET) { - this.setState(Connection.STATE_IDLE); - LOGGER.debug("DB status: IDLE"); - } else { - this.setState(RESULT_HEADER_STATUS); - LOGGER.debug("DB status: Result Header"); - } - break; - case RESULT_HEADER_STATUS: - if (packetType == MySQLPacket.EOF_PACKET) { - this.setState(RESULT_FETCH_STATUS); - LOGGER.debug("DB status: Fetch Row"); - } else if (packetType == MySQLPacket.ERROR_PACKET) { - this.setState(Connection.STATE_IDLE); - LOGGER.debug("DB status: IDLE"); - } else { - LOGGER.debug("DB status: Result Field"); - } - break; - case RESULT_FETCH_STATUS: - if (packetType == MySQLPacket.EOF_PACKET) { - this.setState(Connection.STATE_IDLE); - LOGGER.debug("DB status: IDLE"); - } else if (packetType == MySQLPacket.ERROR_PACKET) { - this.setState(Connection.STATE_IDLE); - LOGGER.debug("DB status: IDLE"); - } else { - LOGGER.debug("DB status: Reading row"); - } - break; - case RESULT_FAIL_STATUS: - if (packetType == MySQLPacket.EOF_PACKET) { - this.setState(Connection.STATE_IDLE); - } - break; - default: - LOGGER.warn("Error connected status.", status); - break; - } - } - - - - public BackConnectionCallback getUserCallback() { - return userCallback; - } - - - public void setUserCallback(BackConnectionCallback conCallback) { - this.userCallback = conCallback; - } - - public MySQLDataSource getPool() { - return dataSource; - } - - public void setPool(MySQLDataSource pool) { - this.dataSource = pool; - } - - - public HandshakePacket getHandshake() { - return handshake; - } - - public void setHandshake(HandshakePacket handshake) { - this.handshake = handshake; - } - - public boolean isAuthenticated() { - return authenticated; - } - - public void setAuthenticated(boolean authenticated) { - this.authenticated = authenticated; - } - - public void authenticate() throws IOException { - AuthPacket packet = new AuthPacket(); - packet.packetId = 1; - packet.clientFlags = clientFlags; - packet.maxPacketSize = maxPacketSize; - packet.charsetIndex = this.charsetIndex; - packet.user = user; - try { - packet.password = passwd(password, handshake); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e.getMessage()); - } - packet.database = schema; - System.out.println("auth "+user+" :"+password+"@"+schema); - - // write to connection - this.writeMsqlPackage(packet); - - } - - private static byte[] passwd(String pass, HandshakePacket hs) throws NoSuchAlgorithmException { - if (pass == null || pass.length() == 0) { - return null; - } - byte[] passwd = pass.getBytes(); - int sl1 = hs.seed.length; - int sl2 = hs.restOfScrambleBuff.length; - byte[] seed = new byte[sl1 + sl2]; - System.arraycopy(hs.seed, 0, seed, 0, sl1); - System.arraycopy(hs.restOfScrambleBuff, 0, seed, sl1, sl2); - return SecurityUtil.scramble411(passwd, seed); - } - - - public boolean isBorrowed() { - return false; - } - - - - public boolean isFromSlaveDB() { - return this.fromSlaveDB; - } - - public MySQLDataSource getDatasource() { - return datasource; - } - - public void setDatasource(MySQLDataSource datasource) { - this.datasource = datasource; - } - - public void release() { - this.datasource.releaseChannel(this); - this.borrowed=false; - } - - public void setBorrowed(boolean borrowed) { - this.borrowed = borrowed; - } - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } - - -} +/* + * Copyright (c) 2016, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.backend; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.NoSuchAlgorithmException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mysql.Capabilities; +import io.mycat.mysql.MySQLConnection; +import io.mycat.mysql.Status; +import io.mycat.mysql.packet.AuthPacket; +import io.mycat.mysql.packet.EOFPacket; +import io.mycat.mysql.packet.HandshakePacket; +import io.mycat.mysql.packet.MySQLMessage; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.mysql.packet.OkPacket; +import io.mycat.net2.ConDataBuffer; +import io.mycat.net2.Connection; +import io.mycat.util.SecurityUtil; +/** + * backend mysql connection + * @author wuzhihui + * + */ + +public class MySQLBackendConnection extends MySQLConnection { + + public static final Logger LOGGER = LoggerFactory.getLogger(MySQLBackendConnection.class); + private final boolean fromSlaveDB; + private BackConnectionCallback userCallback; + private MySQLDataSource dataSource; + private boolean borrowed; + private String schema; + private HandshakePacket handshake; + + private boolean authenticated; + + private long clientFlags; + + private long maxPacketSize = 16 * 1024 * 1024; + + private int charsetIndex; + + private MySQLDataSource datasource; + + public MySQLBackendConnection(MySQLDataSource datasource,SocketChannel channel) { + super(channel); + this.datasource=datasource; + this.fromSlaveDB=datasource.isSlaveNode(); + this.clientFlags = initClientFlags(); + } + + private static long initClientFlags() { + int flag = 0; + flag |= Capabilities.CLIENT_LONG_PASSWORD; + flag |= Capabilities.CLIENT_FOUND_ROWS; + flag |= Capabilities.CLIENT_LONG_FLAG; + flag |= Capabilities.CLIENT_CONNECT_WITH_DB; + // flag |= Capabilities.CLIENT_NO_SCHEMA; + boolean usingCompress = false; + if (usingCompress) { + flag |= Capabilities.CLIENT_COMPRESS; + } + flag |= Capabilities.CLIENT_ODBC; + flag |= Capabilities.CLIENT_LOCAL_FILES; + flag |= Capabilities.CLIENT_IGNORE_SPACE; + flag |= Capabilities.CLIENT_PROTOCOL_41; + flag |= Capabilities.CLIENT_INTERACTIVE; + // flag |= Capabilities.CLIENT_SSL; + flag |= Capabilities.CLIENT_IGNORE_SIGPIPE; + flag |= Capabilities.CLIENT_TRANSACTIONS; + // flag |= Capabilities.CLIENT_RESERVED; + flag |= Capabilities.CLIENT_SECURE_CONNECTION; + // client extension + flag |= Capabilities.CLIENT_MULTI_STATEMENTS; + flag |= Capabilities.CLIENT_MULTI_RESULTS; + //flag |= Capabilities.CLIENT_DEPRECATE_EOF; + return flag; + } + + public void setNextStatus(ConDataBuffer dataBuffer, byte packetType, int pkgStartPos,int pkgLen) throws IOException { + int status = this.getState(); + ByteBuffer buffer = dataBuffer.getBytes(pkgStartPos, pkgLen); + MySQLMessage msg = new MySQLMessage(buffer); + switch (status) { + case Connection.STATE_IDLE: + if (packetType == MySQLPacket.OK_PACKET){ + OkPacket okPacket = new OkPacket(); + okPacket.read(buffer); + if((okPacket.serverStatus & Status.SERVER_STATUS_IN_TRANS) == 0){ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + }else{ + this.setState(TRANSACTION); + LOGGER.debug("DB status: TRANSACTION"); + } + } else if (packetType == MySQLPacket.ERROR_PACKET){ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else if (packetType == MySQLPacket.LOAD_DATA_PACKET){ + this.setState(LOAD_DATA); + LOGGER.debug("DB status: LOAD_DATA"); + } else { + msg.move(MySQLPacket.packetHeaderSize); + long columnCount = msg.readLength(); + if(columnCount > 0){ + this.setState(RESULT_INIT_STATUS); + LOGGER.debug("DB status: RESULT_INIT_STATUS , columnCount: {}",columnCount); + }else{ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } + }/* else { + LOGGER.debug("DB current status: STATE_IDLE , can't set next status , packetType : {}", packetType); + }*/ + break; + case TRANSACTION: + if (packetType == MySQLPacket.OK_PACKET){ + OkPacket okPacket = new OkPacket(); + okPacket.read(buffer); + if((okPacket.serverStatus & Status.SERVER_STATUS_IN_TRANS) == 0){ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + }else{ + this.setState(TRANSACTION); + LOGGER.debug("DB status: TRANSACTION"); + } + } else if (packetType == MySQLPacket.EOF_PACKET){ + EOFPacket eofPacket = new EOFPacket(); + eofPacket.read(buffer); + if((eofPacket.status & Status.SERVER_STATUS_IN_TRANS) == 0){ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + }else{ + this.setState(TRANSACTION); + LOGGER.debug("DB status: TRANSACTION"); + } + } else { + LOGGER.debug("DB status: TRANSACTION"); + } + break; + case LOAD_DATA: + if (packetType == MySQLPacket.OK_PACKET){ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else if (packetType == MySQLPacket.ERROR_PACKET){ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else { + LOGGER.debug("DB current status: LOAD_DATA , can't set next status"); + } + break; + case RESULT_INIT_STATUS: + if (packetType == MySQLPacket.OK_PACKET) { + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else if (packetType == MySQLPacket.ERROR_PACKET) { + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else { + this.setState(RESULT_HEADER_STATUS); + LOGGER.debug("DB status: RESULT_HEADER_STATUS"); + } + break; + case RESULT_HEADER_STATUS: + if (packetType == MySQLPacket.EOF_PACKET) { + if((this.clientFlags & Capabilities.CLIENT_DEPRECATE_EOF) == 0){//没有设置CLIENT_DEPRECATE_EOF + this.setState(RESULT_HEADER_EOF); + LOGGER.debug("DB status: RESULT_HEADER_EOF"); + } else{ + EOFPacket eofPacket = new EOFPacket(); + eofPacket.read(buffer); + if((eofPacket.status & Status.SERVER_MORE_RESULTS_EXISTS) == 0){//没有更多结果集 + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + }else{ + this.setState(RESULT_MORE); + LOGGER.debug("DB status: RESULT_MORE"); + } + } + } else if (packetType == MySQLPacket.ERROR_PACKET) { + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else { + LOGGER.debug("DB status: RESULT_HEADER_STATUS"); + } + break; + case RESULT_HEADER_EOF: + if (packetType == MySQLPacket.EOF_PACKET) { + EOFPacket eofPacket = new EOFPacket(); + eofPacket.read(buffer); + if((eofPacket.status & Status.SERVER_MORE_RESULTS_EXISTS) == 0){//没有更多结果集 + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + }else{ + this.setState(RESULT_MORE); + LOGGER.debug("DB status: RESULT_MORE"); + } + } else { + this.setState(RESULT_FETCH_STATUS); + LOGGER.debug("DB status: RESULT_FETCH_STATUS"); + } + break; + case RESULT_FETCH_STATUS: + if (packetType == MySQLPacket.EOF_PACKET) { + EOFPacket eofPacket = new EOFPacket(); + eofPacket.read(buffer); + if((eofPacket.status & Status.SERVER_MORE_RESULTS_EXISTS) == 0){//没有更多结果集 + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + }else{ + this.setState(RESULT_MORE); + LOGGER.debug("DB status: RESULT_MORE"); + } + } else if (packetType == MySQLPacket.ERROR_PACKET) { + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } else { + LOGGER.debug("DB status: RESULT_FETCH_STATUS"); + } + break; + case RESULT_MORE: + if (packetType == MySQLPacket.OK_PACKET) { + this.setState(Connection.STATE_IDLE); + } else { + msg.move(MySQLPacket.packetHeaderSize); + long columnCount = msg.readLength(); + if(columnCount > 0){ + this.setState(RESULT_INIT_STATUS); + LOGGER.debug("DB status: RESULT_INIT_STATUS , columnCount: {}",columnCount); + }else{ + this.setState(Connection.STATE_IDLE); + LOGGER.debug("DB status: STATE_IDLE"); + } + } + break; + default: + LOGGER.warn("Error connected status.", status); + break; + } + } + + + + public BackConnectionCallback getUserCallback() { + return userCallback; + } + + + public void setUserCallback(BackConnectionCallback conCallback) { + this.userCallback = conCallback; + } + + public MySQLDataSource getPool() { + return dataSource; + } + + public void setPool(MySQLDataSource pool) { + this.dataSource = pool; + } + + + public HandshakePacket getHandshake() { + return handshake; + } + + public void setHandshake(HandshakePacket handshake) { + this.handshake = handshake; + } + + public boolean isAuthenticated() { + return authenticated; + } + + public void setAuthenticated(boolean authenticated) { + this.authenticated = authenticated; + } + + public void authenticate() throws IOException { + AuthPacket packet = new AuthPacket(); + packet.packetId = 1; + packet.clientFlags = clientFlags; + packet.maxPacketSize = maxPacketSize; + packet.charsetIndex = this.charsetIndex; + packet.user = user; + try { + packet.password = passwd(password, handshake); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e.getMessage()); + } + packet.database = schema; + System.out.println("auth "+user+" :"+password+"@"+schema); + + // write to connection + this.writeMsqlPackage(packet); + + } + + private static byte[] passwd(String pass, HandshakePacket hs) throws NoSuchAlgorithmException { + if (pass == null || pass.length() == 0) { + return null; + } + byte[] passwd = pass.getBytes(); + int sl1 = hs.seed.length; + int sl2 = hs.restOfScrambleBuff.length; + byte[] seed = new byte[sl1 + sl2]; + System.arraycopy(hs.seed, 0, seed, 0, sl1); + System.arraycopy(hs.restOfScrambleBuff, 0, seed, sl1, sl2); + return SecurityUtil.scramble411(passwd, seed); + } + + + public boolean isBorrowed() { + return false; + } + + + + public boolean isFromSlaveDB() { + return this.fromSlaveDB; + } + + public MySQLDataSource getDatasource() { + return datasource; + } + + public void setDatasource(MySQLDataSource datasource) { + this.datasource = datasource; + } + + public void release() { + this.datasource.releaseChannel(this); + this.borrowed=false; + } + + public void setBorrowed(boolean borrowed) { + this.borrowed = borrowed; + } + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + +} diff --git a/Mycat-Core/src/main/java/io/mycat/backend/callback/DirectTransTofrontCallBack.java b/Mycat-Core/src/main/java/io/mycat/backend/callback/DirectTransTofrontCallBack.java index 37bb7d9..6ad6e02 100644 --- a/Mycat-Core/src/main/java/io/mycat/backend/callback/DirectTransTofrontCallBack.java +++ b/Mycat-Core/src/main/java/io/mycat/backend/callback/DirectTransTofrontCallBack.java @@ -52,7 +52,7 @@ public void handleResponse(MySQLBackendConnection source, ConDataBuffer dataBuff MySQLFrontConnection frontCon=getFrontCon(source); frontCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos, pkgLen)); frontCon.enableWrite(false); - source.setNextStatus(packageType); + source.setNextStatus(dataBuffer,packageType,pkgStartPos,pkgLen); if (source.getState()== Connection.STATE_IDLE) { diff --git a/Mycat-Core/src/main/java/io/mycat/engine/impl/AbstractSchemaSQLCommandHandler.java b/Mycat-Core/src/main/java/io/mycat/engine/impl/AbstractSchemaSQLCommandHandler.java index ddd7dd7..fb21795 100644 --- a/Mycat-Core/src/main/java/io/mycat/engine/impl/AbstractSchemaSQLCommandHandler.java +++ b/Mycat-Core/src/main/java/io/mycat/engine/impl/AbstractSchemaSQLCommandHandler.java @@ -46,6 +46,7 @@ import io.mycat.engine.sqlparser.SQLInfo; import io.mycat.engine.sqlparser.ServerParse; import io.mycat.front.MySQLFrontConnection; +import io.mycat.mysql.MySQLConnection; import io.mycat.mysql.packet.MySQLMessage; import io.mycat.mysql.packet.MySQLPacket; import io.mycat.mysql.packet.OkPacket; @@ -63,20 +64,27 @@ public abstract class AbstractSchemaSQLCommandHandler implements SQLCommandHandl @Override public void processCmd(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, int pkgStartPos, int pkgLen) throws IOException { - switch (packageType) { - case MySQLPacket.COM_QUIT: - LOGGER.info("Client quit."); - frontCon.close("quit packet"); - break; - case MySQLPacket.COM_INIT_DB: - // Implementation: use database; - // @author little-pan - // @since 2016-09-29 - final ByteBuffer byteBuff = dataBuffer.getBytes(pkgStartPos, pkgLen); - initDb(frontCon, byteBuff); - break; - default: - doSQLCommand(frontCon, dataBuffer, packageType, pkgStartPos, pkgLen); + if(frontCon.getState() == MySQLConnection.LOAD_DATA){//load data 状态下直接传输数据 + passThroughSQL(frontCon, dataBuffer,pkgStartPos, pkgLen); + if(pkgLen == 4){//empty packet 代表传输数据完毕 + frontCon.setState(MySQLConnection.STATE_IDLE); + } + }else{ + switch (packageType) { + case MySQLPacket.COM_QUIT: + LOGGER.info("Client quit."); + frontCon.close("quit packet"); + break; + case MySQLPacket.COM_INIT_DB: + // Implementation: use database; + // @author little-pan + // @since 2016-09-29 + final ByteBuffer byteBuff = dataBuffer.getBytes(pkgStartPos, pkgLen); + initDb(frontCon, byteBuff); + break; + default: + doSQLCommand(frontCon, dataBuffer, packageType, pkgStartPos, pkgLen); + } } } @@ -153,13 +161,43 @@ private void doSQLCommand(MySQLFrontConnection frontCon, ConDataBuffer dataBuffe executeSelectSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + // SelectHandler.handle(sql, con, rs >>> 8); + break; + case ServerParse.CALL: + LOGGER.debug("CALL"); + mycatSchema = frontCon.getMycatSchema(); + if (mycatSchema == null) { + frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected"); + return; + } + + executeCallSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + // SelectHandler.handle(sql, con, rs >>> 8); break; case ServerParse.START: LOGGER.debug("START"); + mycatSchema = frontCon.getMycatSchema(); + if (mycatSchema == null) { + frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected"); + return; + } + + executeSelectSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + + // SelectHandler.handle(sql, con, rs >>> 8); break; case ServerParse.BEGIN: LOGGER.debug("BEGIN"); + mycatSchema = frontCon.getMycatSchema(); + if (mycatSchema == null) { + frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected"); + return; + } + + executeSelectSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + + // SelectHandler.handle(sql, con, rs >>> 8); break; case ServerParse.SAVEPOINT: LOGGER.debug("SAVEPOINT"); @@ -182,9 +220,27 @@ private void doSQLCommand(MySQLFrontConnection frontCon, ConDataBuffer dataBuffe return; case ServerParse.COMMIT: LOGGER.debug("COMMIT"); + mycatSchema = frontCon.getMycatSchema(); + if (mycatSchema == null) { + frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected"); + return; + } + + executeSelectSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + + // SelectHandler.handle(sql, con, rs >>> 8); break; case ServerParse.ROLLBACK: LOGGER.debug("ROLLBACK"); + mycatSchema = frontCon.getMycatSchema(); + if (mycatSchema == null) { + frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected"); + return; + } + + executeSelectSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + + // SelectHandler.handle(sql, con, rs >>> 8); break; case ServerParse.HELP: LOGGER.debug("HELP"); @@ -197,6 +253,16 @@ private void doSQLCommand(MySQLFrontConnection frontCon, ConDataBuffer dataBuffe break; case ServerParse.LOAD_DATA_INFILE_SQL: LOGGER.debug("LOAD_DATA_INFILE_SQL"); + mycatSchema = frontCon.getMycatSchema(); + if (mycatSchema == null) { + frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected"); + return; + } + + executeLoadDataSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql); + frontCon.setState(MySQLConnection.LOAD_DATA); + + // SelectHandler.handle(sql, con, rs >>> 8); break; default: LOGGER.debug("DEFAULT"); @@ -275,4 +341,10 @@ public abstract void executeShowSQL(MySQLFrontConnection frontCon, ConDataBuffer public abstract void executeSelectSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, int pkgStartPos, int pkgLen,String sql) throws IOException; + + public abstract void executeCallSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, + int pkgStartPos, int pkgLen,String sql) throws IOException; + + public abstract void executeLoadDataSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, + int pkgStartPos, int pkgLen,String sql) throws IOException; } diff --git a/Mycat-Core/src/main/java/io/mycat/engine/impl/NormalSchemaSQLCommandHandler.java b/Mycat-Core/src/main/java/io/mycat/engine/impl/NormalSchemaSQLCommandHandler.java index bc98333..7afd5b3 100644 --- a/Mycat-Core/src/main/java/io/mycat/engine/impl/NormalSchemaSQLCommandHandler.java +++ b/Mycat-Core/src/main/java/io/mycat/engine/impl/NormalSchemaSQLCommandHandler.java @@ -60,6 +60,20 @@ public void executeSelectSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBu passThroughSQL(frontCon, dataBuffer, pkgStartPos, pkgLen); } + + @Override + public void executeCallSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, + int pkgStartPos, int pkgLen, String sql) throws IOException { + passThroughSQL(frontCon, dataBuffer, pkgStartPos, pkgLen); + + } + + @Override + public void executeLoadDataSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, + int pkgStartPos, int pkgLen, String sql) throws IOException { + passThroughSQL(frontCon, dataBuffer, pkgStartPos, pkgLen); + + } diff --git a/Mycat-Core/src/main/java/io/mycat/engine/impl/PartionSchemaSQLCommandHandler.java b/Mycat-Core/src/main/java/io/mycat/engine/impl/PartionSchemaSQLCommandHandler.java index 711117d..046ce4d 100644 --- a/Mycat-Core/src/main/java/io/mycat/engine/impl/PartionSchemaSQLCommandHandler.java +++ b/Mycat-Core/src/main/java/io/mycat/engine/impl/PartionSchemaSQLCommandHandler.java @@ -65,4 +65,19 @@ public void executeSelectSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBu } + + @Override + public void executeCallSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, + int pkgStartPos, int pkgLen, String sql) throws IOException { + frontCon.writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Not implemented Yet,Welcome to be a Commiter ,Leader want you !!"); + } + + + @Override + public void executeLoadDataSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType, + int pkgStartPos, int pkgLen, String sql) throws IOException { + frontCon.writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Not implemented Yet,Welcome to be a Commiter ,Leader want you !!"); + + } + } diff --git a/Mycat-Core/src/main/java/io/mycat/mysql/Capabilities.java b/Mycat-Core/src/main/java/io/mycat/mysql/Capabilities.java index 450e267..9db3cd2 100644 --- a/Mycat-Core/src/main/java/io/mycat/mysql/Capabilities.java +++ b/Mycat-Core/src/main/java/io/mycat/mysql/Capabilities.java @@ -1,112 +1,115 @@ -/* - * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://code.google.com/p/opencloudb/. - * - */ -package io.mycat.mysql; - -/** - * 处理能力标识定义 - * - * @author mycat - */ -public interface Capabilities { - - /** - * server capabilities - * - *
-     * server:        11110111 11111111
-     * client_cmd: 11 10100110 10000101
-     * client_jdbc:10 10100010 10001111
-     *  
-     * @see http://dev.mysql.com/doc/refman/5.1/en/mysql-real-connect.html
-     * 
- */ - // new more secure passwords - public static final int CLIENT_LONG_PASSWORD = 1; - - // Found instead of affected rows - // 返回找到(匹配)的行数,而不是改变了的行数。 - public static final int CLIENT_FOUND_ROWS = 2; - - // Get all column flags - public static final int CLIENT_LONG_FLAG = 4; - - // One can specify db on connect - public static final int CLIENT_CONNECT_WITH_DB = 8; - - // Don't allow database.table.column - // 不允许“数据库名.表名.列名”这样的语法。这是对于ODBC的设置。 - // 当使用这样的语法时解析器会产生一个错误,这对于一些ODBC的程序限制bug来说是有用的。 - public static final int CLIENT_NO_SCHEMA = 16; - - // Can use compression protocol - // 使用压缩协议 - public static final int CLIENT_COMPRESS = 32; - - // Odbc client - public static final int CLIENT_ODBC = 64; - - // Can use LOAD DATA LOCAL - public static final int CLIENT_LOCAL_FILES = 128; - - // Ignore spaces before '(' - // 允许在函数名后使用空格。所有函数名可以预留字。 - public static final int CLIENT_IGNORE_SPACE = 256; - - // New 4.1 protocol This is an interactive client - public static final int CLIENT_PROTOCOL_41 = 512; - - // This is an interactive client - // 允许使用关闭连接之前的不活动交互超时的描述,而不是等待超时秒数。 - // 客户端的会话等待超时变量变为交互超时变量。 - public static final int CLIENT_INTERACTIVE = 1024; - - // Switch to SSL after handshake - // 使用SSL。这个设置不应该被应用程序设置,他应该是在客户端库内部是设置的。 - // 可以在调用mysql_real_connect()之前调用mysql_ssl_set()来代替设置。 - public static final int CLIENT_SSL = 2048; - - // IGNORE sigpipes - // 阻止客户端库安装一个SIGPIPE信号处理器。 - // 这个可以用于当应用程序已经安装该处理器的时候避免与其发生冲突。 - public static final int CLIENT_IGNORE_SIGPIPE = 4096; - - // Client knows about transactions - public static final int CLIENT_TRANSACTIONS = 8192; - - // Old flag for 4.1 protocol - public static final int CLIENT_RESERVED = 16384; - - // New 4.1 authentication - public static final int CLIENT_SECURE_CONNECTION = 32768; - - // Enable/disable multi-stmt support - // 通知服务器客户端可以发送多条语句(由分号分隔)。如果该标志为没有被设置,多条语句执行。 - public static final int CLIENT_MULTI_STATEMENTS = 65536; - - // Enable/disable multi-results - // 通知服务器客户端可以处理由多语句或者存储过程执行生成的多结果集。 - // 当打开CLIENT_MULTI_STATEMENTS时,这个标志自动的被打开。 - public static final int CLIENT_MULTI_RESULTS = 131072; - +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql; + +/** + * 处理能力标识定义 + * + * @author mycat + */ +public interface Capabilities { + + /** + * server capabilities + * + *
+     * server:        11110111 11111111
+     * client_cmd: 11 10100110 10000101
+     * client_jdbc:10 10100010 10001111
+     *  
+     * @see http://dev.mysql.com/doc/refman/5.1/en/mysql-real-connect.html
+     * 
+ */ + // new more secure passwords + public static final int CLIENT_LONG_PASSWORD = 1; + + // Found instead of affected rows + // 返回找到(匹配)的行数,而不是改变了的行数。 + public static final int CLIENT_FOUND_ROWS = 2; + + // Get all column flags + public static final int CLIENT_LONG_FLAG = 4; + + // One can specify db on connect + public static final int CLIENT_CONNECT_WITH_DB = 8; + + // Don't allow database.table.column + // 不允许“数据库名.表名.列名”这样的语法。这是对于ODBC的设置。 + // 当使用这样的语法时解析器会产生一个错误,这对于一些ODBC的程序限制bug来说是有用的。 + public static final int CLIENT_NO_SCHEMA = 16; + + // Can use compression protocol + // 使用压缩协议 + public static final int CLIENT_COMPRESS = 32; + + // Odbc client + public static final int CLIENT_ODBC = 64; + + // Can use LOAD DATA LOCAL + public static final int CLIENT_LOCAL_FILES = 128; + + // Ignore spaces before '(' + // 允许在函数名后使用空格。所有函数名可以预留字。 + public static final int CLIENT_IGNORE_SPACE = 256; + + // New 4.1 protocol This is an interactive client + public static final int CLIENT_PROTOCOL_41 = 512; + + // This is an interactive client + // 允许使用关闭连接之前的不活动交互超时的描述,而不是等待超时秒数。 + // 客户端的会话等待超时变量变为交互超时变量。 + public static final int CLIENT_INTERACTIVE = 1024; + + // Switch to SSL after handshake + // 使用SSL。这个设置不应该被应用程序设置,他应该是在客户端库内部是设置的。 + // 可以在调用mysql_real_connect()之前调用mysql_ssl_set()来代替设置。 + public static final int CLIENT_SSL = 2048; + + // IGNORE sigpipes + // 阻止客户端库安装一个SIGPIPE信号处理器。 + // 这个可以用于当应用程序已经安装该处理器的时候避免与其发生冲突。 + public static final int CLIENT_IGNORE_SIGPIPE = 4096; + + // Client knows about transactions + public static final int CLIENT_TRANSACTIONS = 8192; + + // Old flag for 4.1 protocol + public static final int CLIENT_RESERVED = 16384; + + // New 4.1 authentication + public static final int CLIENT_SECURE_CONNECTION = 32768; + + // Enable/disable multi-stmt support + // 通知服务器客户端可以发送多条语句(由分号分隔)。如果该标志为没有被设置,多条语句执行。 + public static final int CLIENT_MULTI_STATEMENTS = 65536; + + // Enable/disable multi-results + // 通知服务器客户端可以处理由多语句或者存储过程执行生成的多结果集。 + // 当打开CLIENT_MULTI_STATEMENTS时,这个标志自动的被打开。 + public static final int CLIENT_MULTI_RESULTS = 131072; + + // 通知服务器客户端丢弃EOF报文 + public static final int CLIENT_DEPRECATE_EOF = 0x01000000; + } \ No newline at end of file diff --git a/Mycat-Core/src/main/java/io/mycat/mysql/MySQLConnection.java b/Mycat-Core/src/main/java/io/mycat/mysql/MySQLConnection.java index 71daf5f..43e5449 100644 --- a/Mycat-Core/src/main/java/io/mycat/mysql/MySQLConnection.java +++ b/Mycat-Core/src/main/java/io/mycat/mysql/MySQLConnection.java @@ -1,199 +1,203 @@ -/* - * Copyright (c) 2016, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://code.google.com/p/opencloudb/. - * - */ -package io.mycat.mysql; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.mycat.mysql.packet.ErrorPacket; -import io.mycat.mysql.packet.HandshakePacket; -import io.mycat.mysql.packet.MySQLPacket; -import io.mycat.net2.ConDataBuffer; -import io.mycat.net2.Connection; -import io.mycat.util.RandomUtil; -/** - * Mysql connection - * @author wuzhihui - * - */ -public class MySQLConnection extends Connection { - - protected final static Logger LOGGER = LoggerFactory.getLogger(MySQLConnection.class); - - public static final int CMD_QUERY_STATUS = 11; - public static final int RESULT_WAIT_STATUS = 21; - public static final int RESULT_INIT_STATUS = 22; - public static final int RESULT_FETCH_STATUS = 23; - public static final int RESULT_HEADER_STATUS = 24; - public static final int RESULT_FAIL_STATUS = 29; - - public final static int msyql_packetHeaderSize = 4; - public final static int mysql_packetTypeSize = 1; - - protected String user; - protected String password; - protected String charset; - protected int charsetIndex; - protected byte[] seed; - - public MySQLConnection(SocketChannel channel) { - super(channel); - } - - public static final boolean validateHeader(final long offset, final long position) { - //return offset + msyql_packetHeaderSize + mysql_packetTypeSize <= position; - // ------------------------------------------------------------------------ - // fixbug: can't pass when an empty packet comes, so we should exclude "mysql_packetTypeSize" - // @author little-pan - // @since 2016-09-29 - return (position >= (offset + msyql_packetHeaderSize)); - } - - /** - * 获取报文长度 - * - * @param buffer - * 报文buffer - * @param offset - * buffer解析位置偏移量 - * @param position - * buffer已读位置偏移量 - * @return 报文长度(Header长度+内容长度) - * @throws IOException - */ - public static final int getPacketLength(ConDataBuffer buffer, int offset) throws IOException { - int length = buffer.getByte(offset) & 0xff; - length |= (buffer.getByte(++offset) & 0xff) << 8; - length |= (buffer.getByte(++offset) & 0xff) << 16; - return length + msyql_packetHeaderSize; - } - - protected int getServerCapabilities() { - int flag = 0; - flag |= Capabilities.CLIENT_LONG_PASSWORD; - flag |= Capabilities.CLIENT_FOUND_ROWS; - flag |= Capabilities.CLIENT_LONG_FLAG; - flag |= Capabilities.CLIENT_CONNECT_WITH_DB; - // flag |= Capabilities.CLIENT_NO_SCHEMA; - // boolean usingCompress = MycatServer.getInstance().getConfig() - // .getSystem().getUseCompression() == 1; - // if (usingCompress) { - // flag |= Capabilities.CLIENT_COMPRESS; - // } - flag |= Capabilities.CLIENT_ODBC; - flag |= Capabilities.CLIENT_LOCAL_FILES; - flag |= Capabilities.CLIENT_IGNORE_SPACE; - flag |= Capabilities.CLIENT_PROTOCOL_41; - flag |= Capabilities.CLIENT_INTERACTIVE; - // flag |= Capabilities.CLIENT_SSL; - flag |= Capabilities.CLIENT_IGNORE_SIGPIPE; - flag |= Capabilities.CLIENT_TRANSACTIONS; - // flag |= ServerDefs.CLIENT_RESERVED; - flag |= Capabilities.CLIENT_SECURE_CONNECTION; - return flag; - } - - public void sendAuthPackge() throws IOException { - // 生成认证数据 - byte[] rand1 = RandomUtil.randomBytes(8); - byte[] rand2 = RandomUtil.randomBytes(12); - - // 保存认证数据 - byte[] seed = new byte[rand1.length + rand2.length]; - System.arraycopy(rand1, 0, seed, 0, rand1.length); - System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); - this.seed = seed; - - // 发送握手数据包 - HandshakePacket hs = new HandshakePacket(); - hs.packetId = 0; - hs.protocolVersion = Versions.PROTOCOL_VERSION; - hs.serverVersion = Versions.SERVER_VERSION; - hs.threadId = id; - hs.seed = rand1; - hs.serverCapabilities = getServerCapabilities(); - // hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); - hs.serverStatus = 2; - hs.restOfScrambleBuff = rand2; - this.writeMsqlPackage(hs); - // asynread response - // this.asynRead(); - } - - public void failure(final int errno, final String info){ - try { - LOGGER.warn("errno = {}, info = {}, {}", errno, info, this); - writeErrMessage(errno, info); - } catch (final IOException e) { - this.close(e.toString()); - } - } - - - public void writeMsqlPackage(MySQLPacket pkg) throws IOException - { - int pkgSize=pkg.calcPacketSize(); - ByteBuffer buf=getWriteDataBuffer().beginWrite(pkgSize+MySQLPacket.packetHeaderSize); - pkg.write(buf,pkgSize); - getWriteDataBuffer().endWrite(buf); - this.enableWrite(true); - } - - public void writeErrMessage(int errno, String info) throws IOException { - ErrorPacket err = new ErrorPacket(); - err.packetId = 1; - err.errno = errno; - err.message = info.getBytes(); - this.writeMsqlPackage(err); - } - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - - public String getCharset() { - return charset; - } - public void setCharset(int charsetIndex,String charsetName) - { - this.charsetIndex=charsetIndex; - this.charset=charsetName; - } -} +/* + * Copyright (c) 2016, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mysql.packet.ErrorPacket; +import io.mycat.mysql.packet.HandshakePacket; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.net2.ConDataBuffer; +import io.mycat.net2.Connection; +import io.mycat.util.RandomUtil; +/** + * Mysql connection + * @author wuzhihui + * + */ +public class MySQLConnection extends Connection { + + protected final static Logger LOGGER = LoggerFactory.getLogger(MySQLConnection.class); + + public static final int CMD_QUERY_STATUS = 11; + public static final int RESULT_WAIT_STATUS = 21; + public static final int RESULT_INIT_STATUS = 22; + public static final int RESULT_FETCH_STATUS = 23; + public static final int RESULT_HEADER_STATUS = 24; + public static final int RESULT_HEADER_EOF = 25; + public static final int RESULT_MORE = 26; + public static final int LOAD_DATA = 27; + public static final int TRANSACTION = 28; + public static final int RESULT_FAIL_STATUS = 29; + + public final static int msyql_packetHeaderSize = 4; + public final static int mysql_packetTypeSize = 1; + + protected String user; + protected String password; + protected String charset; + protected int charsetIndex; + protected byte[] seed; + + public MySQLConnection(SocketChannel channel) { + super(channel); + } + + public static final boolean validateHeader(final long offset, final long position) { + //return offset + msyql_packetHeaderSize + mysql_packetTypeSize <= position; + // ------------------------------------------------------------------------ + // fixbug: can't pass when an empty packet comes, so we should exclude "mysql_packetTypeSize" + // @author little-pan + // @since 2016-09-29 + return (position >= (offset + msyql_packetHeaderSize)); + } + + /** + * 获取报文长度 + * + * @param buffer + * 报文buffer + * @param offset + * buffer解析位置偏移量 + * @param position + * buffer已读位置偏移量 + * @return 报文长度(Header长度+内容长度) + * @throws IOException + */ + public static final int getPacketLength(ConDataBuffer buffer, int offset) throws IOException { + int length = buffer.getByte(offset) & 0xff; + length |= (buffer.getByte(++offset) & 0xff) << 8; + length |= (buffer.getByte(++offset) & 0xff) << 16; + return length + msyql_packetHeaderSize; + } + + protected int getServerCapabilities() { + int flag = 0; + flag |= Capabilities.CLIENT_LONG_PASSWORD; + flag |= Capabilities.CLIENT_FOUND_ROWS; + flag |= Capabilities.CLIENT_LONG_FLAG; + flag |= Capabilities.CLIENT_CONNECT_WITH_DB; + // flag |= Capabilities.CLIENT_NO_SCHEMA; + // boolean usingCompress = MycatServer.getInstance().getConfig() + // .getSystem().getUseCompression() == 1; + // if (usingCompress) { + // flag |= Capabilities.CLIENT_COMPRESS; + // } + flag |= Capabilities.CLIENT_ODBC; + flag |= Capabilities.CLIENT_LOCAL_FILES; + flag |= Capabilities.CLIENT_IGNORE_SPACE; + flag |= Capabilities.CLIENT_PROTOCOL_41; + flag |= Capabilities.CLIENT_INTERACTIVE; + // flag |= Capabilities.CLIENT_SSL; + flag |= Capabilities.CLIENT_IGNORE_SIGPIPE; + flag |= Capabilities.CLIENT_TRANSACTIONS; + // flag |= ServerDefs.CLIENT_RESERVED; + flag |= Capabilities.CLIENT_SECURE_CONNECTION; + return flag; + } + + public void sendAuthPackge() throws IOException { + // 生成认证数据 + byte[] rand1 = RandomUtil.randomBytes(8); + byte[] rand2 = RandomUtil.randomBytes(12); + + // 保存认证数据 + byte[] seed = new byte[rand1.length + rand2.length]; + System.arraycopy(rand1, 0, seed, 0, rand1.length); + System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); + this.seed = seed; + + // 发送握手数据包 + HandshakePacket hs = new HandshakePacket(); + hs.packetId = 0; + hs.protocolVersion = Versions.PROTOCOL_VERSION; + hs.serverVersion = Versions.SERVER_VERSION; + hs.threadId = id; + hs.seed = rand1; + hs.serverCapabilities = getServerCapabilities(); + // hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); + hs.serverStatus = 2; + hs.restOfScrambleBuff = rand2; + this.writeMsqlPackage(hs); + // asynread response + // this.asynRead(); + } + + public void failure(final int errno, final String info){ + try { + LOGGER.warn("errno = {}, info = {}, {}", errno, info, this); + writeErrMessage(errno, info); + } catch (final IOException e) { + this.close(e.toString()); + } + } + + + public void writeMsqlPackage(MySQLPacket pkg) throws IOException + { + int pkgSize=pkg.calcPacketSize(); + ByteBuffer buf=getWriteDataBuffer().beginWrite(pkgSize+MySQLPacket.packetHeaderSize); + pkg.write(buf,pkgSize); + getWriteDataBuffer().endWrite(buf); + this.enableWrite(true); + } + + public void writeErrMessage(int errno, String info) throws IOException { + ErrorPacket err = new ErrorPacket(); + err.packetId = 1; + err.errno = errno; + err.message = info.getBytes(); + this.writeMsqlPackage(err); + } + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + + public String getCharset() { + return charset; + } + public void setCharset(int charsetIndex,String charsetName) + { + this.charsetIndex=charsetIndex; + this.charset=charsetName; + } +} diff --git a/Mycat-Core/src/main/java/io/mycat/mysql/Status.java b/Mycat-Core/src/main/java/io/mycat/mysql/Status.java new file mode 100644 index 0000000..e7458aa --- /dev/null +++ b/Mycat-Core/src/main/java/io/mycat/mysql/Status.java @@ -0,0 +1,53 @@ +package io.mycat.mysql; + +/** + * https://dev.mysql.com/doc/internals/en/status-flags.html + * @author zhanghaojia + * + */ +public interface Status { + + + /** + * a transaction is active + */ + public static int SERVER_STATUS_IN_TRANS = 0x0001; + + /** + * auto-commit is enabled + */ + public static int SERVER_STATUS_AUTOCOMMIT = 0x0002; + + public static int SERVER_MORE_RESULTS_EXISTS = 0x0008; + + public static int SERVER_STATUS_NO_GOOD_INDEX_USED = 0x0010; + + public static int SERVER_STATUS_NO_INDEX_USED = 0x0020; + + /** + * Used by Binary Protocol Resultset to signal that COM_STMT_FETCH must be used to fetch the row-data. + */ + public static int SERVER_STATUS_CURSOR_EXISTS = 0x0040; + + public static int SERVER_STATUS_LAST_ROW_SENT = 0x0080; + + public static int SERVER_STATUS_DB_DROPPED = 0x0100; + + public static int SERVER_STATUS_NO_BACKSLASH_ESCAPES = 0x0200; + + public static int SERVER_STATUS_METADATA_CHANGED = 0x0400; + + public static int SERVER_QUERY_WAS_SLOW = 0x0800; + + public static int SERVER_PS_OUT_PARAMS = 0x1000; + + /** + * in a read-only transaction + */ + public static int SERVER_STATUS_IN_TRANS_READONLY = 0x2000; + + /** + * connection state information has changed + */ + public static int SERVER_SESSION_STATE_CHANGED = 0x4000; +} diff --git a/Mycat-Core/src/main/java/io/mycat/mysql/packet/MySQLPacket.java b/Mycat-Core/src/main/java/io/mycat/mysql/packet/MySQLPacket.java index d692230..a2d7691 100644 --- a/Mycat-Core/src/main/java/io/mycat/mysql/packet/MySQLPacket.java +++ b/Mycat-Core/src/main/java/io/mycat/mysql/packet/MySQLPacket.java @@ -1,224 +1,225 @@ -/* - * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://code.google.com/p/opencloudb/. - * - */ -package io.mycat.mysql.packet; - -import java.nio.ByteBuffer; -/** - * - * @author wuzhihui - * - */ -public abstract class MySQLPacket { - - public static int packetHeaderSize = 4; - - // 后端报文类型 - public static final byte REQUEST_FILE_FIELD_COUNT = (byte) 251; - public static final byte OK_PACKET = 0; - public static final byte ERROR_PACKET = (byte) 0xFF; - public static final byte EOF_PACKET = (byte) 0xFE; - public static final byte FIELD_EOF_PACKET = (byte) 0xFE; - public static final byte ROW_EOF_PACKET = (byte) 0xFE; - public static final byte AUTH_PACKET = 1; - public static final byte QUIT_PACKET = 2; - - // 前端报文类型 - /** - * none, this is an internal thread state - */ - public static final byte COM_SLEEP = 0; - - /** - * mysql_close - */ - public static final byte COM_QUIT = 1; - - public static final int COM_QUIT_PACKET_LENGTH = 1; - - /** - * mysql_select_db - */ - public static final byte COM_INIT_DB = 2; - - /** - * mysql_real_query - */ - public static final byte COM_QUERY = 3; - - /** - * mysql_list_fields - */ - public static final byte COM_FIELD_LIST = 4; - - /** - * mysql_create_db (deprecated) - */ - public static final byte COM_CREATE_DB = 5; - - /** - * mysql_drop_db (deprecated) - */ - public static final byte COM_DROP_DB = 6; - - /** - * mysql_refresh - */ - public static final byte COM_REFRESH = 7; - - /** - * mysql_shutdown - */ - public static final byte COM_SHUTDOWN = 8; - - /** - * mysql_stat - */ - public static final byte COM_STATISTICS = 9; - - /** - * mysql_list_processes - */ - public static final byte COM_PROCESS_INFO = 10; - - /** - * none, this is an internal thread state - */ - public static final byte COM_CONNECT = 11; - - /** - * mysql_kill - */ - public static final byte COM_PROCESS_KILL = 12; - - /** - * mysql_dump_debug_info - */ - public static final byte COM_DEBUG = 13; - - /** - * mysql_ping - */ - public static final byte COM_PING = 14; - - /** - * none, this is an internal thread state - */ - public static final byte COM_TIME = 15; - - /** - * none, this is an internal thread state - */ - public static final byte COM_DELAYED_INSERT = 16; - - /** - * mysql_change_user - */ - public static final byte COM_CHANGE_USER = 17; - - /** - * used by slave server mysqlbinlog - */ - public static final byte COM_BINLOG_DUMP = 18; - - /** - * used by slave server to get master table - */ - public static final byte COM_TABLE_DUMP = 19; - - /** - * used by slave to log connection to master - */ - public static final byte COM_CONNECT_OUT = 20; - - /** - * used by slave to register to master - */ - public static final byte COM_REGISTER_SLAVE = 21; - - /** - * mysql_stmt_prepare - */ - public static final byte COM_STMT_PREPARE = 22; - - /** - * mysql_stmt_execute - */ - public static final byte COM_STMT_EXECUTE = 23; - - /** - * mysql_stmt_send_long_data - */ - public static final byte COM_STMT_SEND_LONG_DATA = 24; - - /** - * mysql_stmt_close - */ - public static final byte COM_STMT_CLOSE = 25; - - /** - * mysql_stmt_reset - */ - public static final byte COM_STMT_RESET = 26; - - /** - * mysql_set_server_option - */ - public static final byte COM_SET_OPTION = 27; - - /** - * mysql_stmt_fetch - */ - public static final byte COM_STMT_FETCH = 28; - - /** - * Mycat heartbeat - */ - public static final byte COM_HEARTBEAT = 64; - - - public int packetLength; - public byte packetId; - - /** - * 计算数据包大小,不包含包头长度。 - */ - public abstract int calcPacketSize(); - - /** - * 取得数据包信息 - */ - protected abstract String getPacketInfo(); - - @Override - public String toString() { - return new StringBuilder().append(getPacketInfo()).append("{length=") - .append(packetLength).append(",id=").append(packetId) - .append('}').toString(); - } - - public void write(ByteBuffer buffer,int pkgSize) { - - - } -} +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql.packet; + +import java.nio.ByteBuffer; +/** + * + * @author wuzhihui + * + */ +public abstract class MySQLPacket { + + public static int packetHeaderSize = 4; + + // 后端报文类型 + public static final byte REQUEST_FILE_FIELD_COUNT = (byte) 251; + public static final byte OK_PACKET = 0; + public static final byte ERROR_PACKET = (byte) 0xFF; + public static final byte LOAD_DATA_PACKET = (byte)0xFB; + public static final byte EOF_PACKET = (byte) 0xFE; + public static final byte FIELD_EOF_PACKET = (byte) 0xFE; + public static final byte ROW_EOF_PACKET = (byte) 0xFE; + public static final byte AUTH_PACKET = 1; + public static final byte QUIT_PACKET = 2; + + // 前端报文类型 + /** + * none, this is an internal thread state + */ + public static final byte COM_SLEEP = 0; + + /** + * mysql_close + */ + public static final byte COM_QUIT = 1; + + public static final int COM_QUIT_PACKET_LENGTH = 1; + + /** + * mysql_select_db + */ + public static final byte COM_INIT_DB = 2; + + /** + * mysql_real_query + */ + public static final byte COM_QUERY = 3; + + /** + * mysql_list_fields + */ + public static final byte COM_FIELD_LIST = 4; + + /** + * mysql_create_db (deprecated) + */ + public static final byte COM_CREATE_DB = 5; + + /** + * mysql_drop_db (deprecated) + */ + public static final byte COM_DROP_DB = 6; + + /** + * mysql_refresh + */ + public static final byte COM_REFRESH = 7; + + /** + * mysql_shutdown + */ + public static final byte COM_SHUTDOWN = 8; + + /** + * mysql_stat + */ + public static final byte COM_STATISTICS = 9; + + /** + * mysql_list_processes + */ + public static final byte COM_PROCESS_INFO = 10; + + /** + * none, this is an internal thread state + */ + public static final byte COM_CONNECT = 11; + + /** + * mysql_kill + */ + public static final byte COM_PROCESS_KILL = 12; + + /** + * mysql_dump_debug_info + */ + public static final byte COM_DEBUG = 13; + + /** + * mysql_ping + */ + public static final byte COM_PING = 14; + + /** + * none, this is an internal thread state + */ + public static final byte COM_TIME = 15; + + /** + * none, this is an internal thread state + */ + public static final byte COM_DELAYED_INSERT = 16; + + /** + * mysql_change_user + */ + public static final byte COM_CHANGE_USER = 17; + + /** + * used by slave server mysqlbinlog + */ + public static final byte COM_BINLOG_DUMP = 18; + + /** + * used by slave server to get master table + */ + public static final byte COM_TABLE_DUMP = 19; + + /** + * used by slave to log connection to master + */ + public static final byte COM_CONNECT_OUT = 20; + + /** + * used by slave to register to master + */ + public static final byte COM_REGISTER_SLAVE = 21; + + /** + * mysql_stmt_prepare + */ + public static final byte COM_STMT_PREPARE = 22; + + /** + * mysql_stmt_execute + */ + public static final byte COM_STMT_EXECUTE = 23; + + /** + * mysql_stmt_send_long_data + */ + public static final byte COM_STMT_SEND_LONG_DATA = 24; + + /** + * mysql_stmt_close + */ + public static final byte COM_STMT_CLOSE = 25; + + /** + * mysql_stmt_reset + */ + public static final byte COM_STMT_RESET = 26; + + /** + * mysql_set_server_option + */ + public static final byte COM_SET_OPTION = 27; + + /** + * mysql_stmt_fetch + */ + public static final byte COM_STMT_FETCH = 28; + + /** + * Mycat heartbeat + */ + public static final byte COM_HEARTBEAT = 64; + + + public int packetLength; + public byte packetId; + + /** + * 计算数据包大小,不包含包头长度。 + */ + public abstract int calcPacketSize(); + + /** + * 取得数据包信息 + */ + protected abstract String getPacketInfo(); + + @Override + public String toString() { + return new StringBuilder().append(getPacketInfo()).append("{length=") + .append(packetLength).append(",id=").append(packetId) + .append('}').toString(); + } + + public void write(ByteBuffer buffer,int pkgSize) { + + + } +}