Chinaunix首页 | 论坛 | 博客
  • 博客访问: 31302
  • 博文数量: 6
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 91
  • 用 户 组: 普通用户
  • 注册时间: 2014-10-19 20:11
文章分类

全部博文(6)

文章存档

2016年(1)

2014年(5)

我的朋友

分类: Java

2014-10-21 21:04:05

我们先说说上篇结尾的思考题,如果各个命令/请求参数信息截然不同的话,那都用一个Result来封装,我觉得是不太妥当的。如果差异这么大,我觉得还不如用不同的类来表示不同的命令/请求,就像在web中,有不同的HttpServletRequest的实现。这HSQLDB里,请求响应都是用Result表示,而在web里则分别用HttpServletRequest、HttpServletResponse表示。

这篇我们将继续深入阅读源码,了解命令的处理过程。

点击(此处)折叠或打开
  1. while (keepAlive) {
  2.                     msgType = dataInput.readByte();

  3.                     if (msgType < ResultConstants.MODE_UPPER_LIMIT) {
  4.                         receiveResult(msgType);
  5.                     } else {
  6.                         receiveOdbcPacket((char) msgType);
  7.                     }
  8.                 }
这是在ServerConnection.run方法中一个片段,这里是命令处理的主要地方。我们这里仍然只分析HSQL_STREAM_PROTOCOL的处理过程。
进入到receiveResult方法中一探究竟:

点击(此处)折叠或打开

  1. private void receiveResult(int resultMode) throws CleanExit, IOException {

  2.         boolean terminate = false;
  3.         Result resultIn = Result.newResult(session, resultMode, dataInput,
  4.                                            rowIn);

  5.         resultIn.readLobResults(session, dataInput, rowIn);
  6.         server.printRequest(mThread, resultIn);

  7.         Result resultOut = null;

  8.         switch (resultIn.getType()) {

  9.             case ResultConstants.CONNECT : {
  10.                 resultOut = setDatabase(resultIn);

  11.                 break;
  12.             }
  13.             case ResultConstants.DISCONNECT : {
  14.                 resultOut = Result.updateZeroResult;
  15.                 terminate = true;

  16.                 break;
  17.             }
  18.             case ResultConstants.RESETSESSION : {
  19.                 session.resetSession();

  20.                 resultOut = Result.updateZeroResult;

  21.                 break;
  22.             }
  23.             case ResultConstants.EXECUTE_INVALID : {
  24.                 resultOut =
  25.                     Result.newErrorResult(Error.error(ErrorCode.X_07502));

  26.                 break;
  27.             }
  28.             default : {
  29.                 resultOut = session.execute(resultIn);

  30.                 break;
  31.             }
  32.         }

  33.         resultOut.write(session, dataOutput, rowOut);
  34.         rowOut.reset(mainBuffer);
  35.         rowIn.resetRow(mainBuffer.length);

  36.         if (terminate) {
  37.             throw cleanExit;
  38.         }
  39.     }
我们看其它类型的处理都比较简单,我们要关注就是这个default的处理,继续进入到session.execute方法中。

点击(此处)折叠或打开

  1. public synchronized Result execute(Result cmd) {

  2.         if (isClosed) {
  3.             return Result.newErrorResult(Error.error(ErrorCode.X_08503));
  4.         }

  5.         sessionContext.currentMaxRows = 0;
  6.         isBatch = false;

  7.         JavaSystem.gc();

  8.         switch (cmd.mode) {

  9.             case ResultConstants.LARGE_OBJECT_OP : {
  10.                 return performLOBOperation((ResultLob) cmd);
  11.             }
  12.             case ResultConstants.EXECUTE : {
  13.                 int maxRows = cmd.getUpdateCount();

  14.                 if (maxRows == -1) {
  15.                     sessionContext.currentMaxRows = 0;
  16.                 } else {
  17.                     sessionContext.currentMaxRows = maxRows;
  18.                 }

  19.                 Statement cs = cmd.statement;

  20.                 if (cs == null
  21.                         || cs.compileTimestamp
  22.                            < database.schemaManager.schemaChangeTimestamp) {
  23.                     long csid = cmd.getStatementID();

  24.                     cs = statementManager.getStatement(this, csid);

  25.                     cmd.setStatement(cs);

  26.                     if (cs == null) {

  27.                         // invalid sql has been removed already
  28.                         return Result.newErrorResult(
  29.                             Error.error(ErrorCode.X_07502));
  30.                     }
  31.                 }

  32.                 Object[] pvals = (Object[]) cmd.valueData;
  33.                 Result result = executeCompiledStatement(cs, pvals,
  34.                     cmd.queryTimeout);

  35.                 result = performPostExecute(cmd, result);

  36.                 return result;
  37.             }
  38.             case ResultConstants.BATCHEXECUTE : {
  39.                 isBatch = true;

  40.                 Result result = executeCompiledBatchStatement(cmd);

  41.                 result = performPostExecute(cmd, result);

  42.                 return result;
  43.             }
  44.             case ResultConstants.EXECDIRECT : {
  45.                 Result result = executeDirectStatement(cmd);

  46.                 result = performPostExecute(cmd, result);

  47.                 return result;
  48.             }
  49.             case ResultConstants.BATCHEXECDIRECT : {
  50.                 isBatch = true;

  51.                 Result result = executeDirectBatchStatement(cmd);

  52.                 result = performPostExecute(cmd, result);

  53.                 return result;
  54.             }
  55.             case ResultConstants.PREPARE : {
  56.                 Statement cs;

  57.                 try {
  58.                     cs = statementManager.compile(this, cmd);
  59.                 } catch (Throwable t) {
  60.                     String errorString = cmd.getMainString();

  61.                     if (database.getProperties().getErrorLevel()
  62.                             == HsqlDatabaseProperties.NO_MESSAGE) {
  63.                         errorString = null;
  64.                     }

  65.                     return Result.newErrorResult(t, errorString);
  66.                 }

  67.                 Result result = Result.newPrepareResponse(cs);

  68.                 if (cs.getType() == StatementTypes.SELECT_CURSOR
  69.                         || cs.getType() == StatementTypes.CALL) {
  70.                     sessionData.setResultSetProperties(cmd, result);
  71.                 }

  72.                 result = performPostExecute(cmd, result);

  73.                 return result;
  74.             }
  75.             case ResultConstants.CLOSE_RESULT : {
  76.                 closeNavigator(cmd.getResultId());

  77.                 return Result.updateZeroResult;
  78.             }
  79.             case ResultConstants.UPDATE_RESULT : {
  80.                 Result result = this.executeResultUpdate(cmd);

  81.                 result = performPostExecute(cmd, result);

  82.                 return result;
  83.             }
  84.             case ResultConstants.FREESTMT : {
  85.                 statementManager.freeStatement(cmd.getStatementID());

  86.                 return Result.updateZeroResult;
  87.             }
  88.             case ResultConstants.GETSESSIONATTR : {
  89.                 int id = cmd.getStatementType();

  90.                 return getAttributesResult(id);
  91.             }
  92.             case ResultConstants.SETSESSIONATTR : {
  93.                 return setAttributes(cmd);
  94.             }
  95.             case ResultConstants.ENDTRAN : {
  96.                 switch (cmd.getActionType()) {

  97.                     case ResultConstants.TX_COMMIT :
  98.                         try {
  99.                             commit(false);
  100.                         } catch (Throwable t) {
  101.                             return Result.newErrorResult(t);
  102.                         }
  103.                         break;

  104.                     case ResultConstants.TX_COMMIT_AND_CHAIN :
  105.                         try {
  106.                             commit(true);
  107.                         } catch (Throwable t) {
  108.                             return Result.newErrorResult(t);
  109.                         }
  110.                         break;

  111.                     case ResultConstants.TX_ROLLBACK :
  112.                         rollback(false);
  113.                         break;

  114.                     case ResultConstants.TX_ROLLBACK_AND_CHAIN :
  115.                         rollback(true);
  116.                         break;

  117.                     case ResultConstants.TX_SAVEPOINT_NAME_RELEASE :
  118.                         try {
  119.                             String name = cmd.getMainString();

  120.                             releaseSavepoint(name);
  121.                         } catch (Throwable t) {
  122.                             return Result.newErrorResult(t);
  123.                         }
  124.                         break;

  125.                     case ResultConstants.TX_SAVEPOINT_NAME_ROLLBACK :
  126.                         try {
  127.                             rollbackToSavepoint(cmd.getMainString());
  128.                         } catch (Throwable t) {
  129.                             return Result.newErrorResult(t);
  130.                         }
  131.                         break;

  132.                     case ResultConstants.PREPARECOMMIT :
  133.                         try {
  134.                             prepareCommit();
  135.                         } catch (Throwable t) {
  136.                             return Result.newErrorResult(t);
  137.                         }
  138.                         break;
  139.                 }

  140.                 return Result.updateZeroResult;
  141.             }
  142.             case ResultConstants.SETCONNECTATTR : {
  143.                 switch (cmd.getConnectionAttrType()) {

  144.                     case ResultConstants.SQL_ATTR_SAVEPOINT_NAME :
  145.                         try {
  146.                             savepoint(cmd.getMainString());
  147.                         } catch (Throwable t) {
  148.                             return Result.newErrorResult(t);
  149.                         }

  150.                     // case ResultConstants.SQL_ATTR_AUTO_IPD
  151.                     // - always true
  152.                     // default: throw - case never happens
  153.                 }

  154.                 return Result.updateZeroResult;
  155.             }
  156.             case ResultConstants.REQUESTDATA : {
  157.                 return sessionData.getDataResultSlice(cmd.getResultId(),
  158.                                                       cmd.getUpdateCount(),
  159.                                                       cmd.getFetchSize());
  160.             }
  161.             case ResultConstants.DISCONNECT : {
  162.                 close();

  163.                 return Result.updateZeroResult;
  164.             }
  165.             default : {
  166.                 return Result.newErrorResult(
  167.                     Error.runtimeError(ErrorCode.U_S0500, "Session"));
  168.             }
  169.         }
  170.     }
很多种命令,看起来相当复杂。我们挑其中比较简单的

点击(此处)折叠或打开

  1. case ResultConstants.EXECDIRECT : {
  2.                 Result result = executeDirectStatement(cmd);

  3.                 result = performPostExecute(cmd, result);

  4.                 return result;
  5.             }
当Result的mode为EXECDIRECT时,表示直接执行sql语句。(与之相对应的,我们知道有一种prepare之后再执行sql的方式)
我们先了解其处理过程,再回头来看其它命令,就更容易理解了。
进入executeDirectStatement:

点击(此处)折叠或打开

  1. public Result executeDirectStatement(Result cmd) {

  2.         String sql = cmd.getMainString();
  3.         HsqlArrayList list;
  4.         int maxRows = cmd.getUpdateCount();

  5.         if (maxRows == -1) {
  6.             sessionContext.currentMaxRows = 0;
  7.         } else if (sessionMaxRows == 0) {
  8.             sessionContext.currentMaxRows = maxRows;
  9.         } else {
  10.             sessionContext.currentMaxRows = sessionMaxRows;
  11.             sessionMaxRows = 0;
  12.         }

  13.         try {
  14.             list = parser.compileStatements(sql, cmd);
  15.         } catch (Throwable e) {
  16.             return Result.newErrorResult(e);
  17.         }

  18.         Result result = null;
  19.         boolean recompile = false;
  20.         HsqlName originalSchema = getCurrentSchemaHsqlName();

  21.         for (int i = 0; i < list.size(); i++) {
  22.             Statement cs = (Statement) list.get(i);

  23.             if (i > 0) {
  24.                 if (cs.getCompileTimestamp()
  25.                         > database.txManager.getGlobalChangeTimestamp()) {
  26.                     recompile = true;
  27.                 }

  28.                 if (cs.getSchemaName() != null
  29.                         && cs.getSchemaName() != originalSchema) {
  30.                     recompile = true;
  31.                 }
  32.             }

  33.             if (recompile) {
  34.                 cs = compileStatement(cs.getSQL(), cmd.getExecuteProperties());
  35.             }

  36.             cs.setGeneratedColumnInfo(cmd.getGeneratedResultType(),
  37.                                       cmd.getGeneratedResultMetaData());

  38.             result = executeCompiledStatement(cs, ValuePool.emptyObjectArray,
  39.                                               cmd.queryTimeout);

  40.             if (result.mode == ResultConstants.ERROR) {
  41.                 break;
  42.             }
  43.         }

  44.         return result;
  45.     }
sql语句是字符串,所以需要解析其涵义,hsql先将sql语句进行编译:parser.compileStatements(sql, cmd);(我们下一章再去关注,这个sql语句的编译过程)。返回的是HsqlArrayList——存Statement的List,意味着可能一个命令中包含多个执行的sql语句,然后循环执行该list中的Statement对象——executeCompiledStatement

  1. result = executeCompiledStatement(cs, ValuePool.emptyObjectArray,
  2.                                               cmd.queryTimeout);
再进入executeCompiledStatement

点击(此处)折叠或打开

  1. public Result executeCompiledStatement(Statement cs, Object[] pvals,
  2.                                            int timeout) {

  3.         Result r;

  4.         if (abortTransaction) {
  5.             rollbackNoCheck(false);

  6.             return Result.newErrorResult(Error.error(ErrorCode.X_40001));
  7.         }

  8.         if (sessionContext.depth > 0) {
  9.             if (sessionContext.noSQL.booleanValue()
  10.                     || cs.isAutoCommitStatement()) {
  11.                 return Result.newErrorResult(Error.error(ErrorCode.X_46000));
  12.             }
  13.         }

  14.         if (cs.isAutoCommitStatement()) {
  15.             if (isReadOnly()) {
  16.                 return Result.newErrorResult(Error.error(ErrorCode.X_25006));
  17.             }

  18.             try {

  19.                 /** special autocommit for backward compatibility */
  20.                 commit(false);
  21.             } catch (HsqlException e) {
  22.                 database.logger.logInfoEvent("Exception at commit");
  23.             }
  24.         }

  25.         sessionContext.currentStatement = cs;

  26.         boolean isTX = cs.isTransactionStatement();

  27.         if (!isTX) {
  28.             actionTimestamp =
  29.                 database.txManager.getNextGlobalChangeTimestamp();

  30.             sessionContext.setDynamicArguments(pvals);

  31.             // statements such as DISCONNECT may close the session
  32.             if (database.logger.getSqlEventLogLevel()
  33.                     >= SimpleLog.LOG_NORMAL) {
  34.                 database.logger.logStatementEvent(this, cs, pvals,
  35.                                                   Result.updateZeroResult,
  36.                                                   SimpleLog.LOG_NORMAL);
  37.             }

  38.             r = cs.execute(this);
  39.             sessionContext.currentStatement = null;

  40.             return r;
  41.         }

  42.         while (true) {
  43.             actionIndex = rowActionList.size();

  44.             database.txManager.beginAction(this, cs);

  45.             cs = sessionContext.currentStatement;

  46.             if (cs == null) {
  47.                 return Result.newErrorResult(Error.error(ErrorCode.X_07502));
  48.             }

  49.             if (abortTransaction) {
  50.                 rollbackNoCheck(false);

  51.                 sessionContext.currentStatement = null;

  52.                 return Result.newErrorResult(Error.error(ErrorCode.X_40001));
  53.             }

  54.             timeoutManager.startTimeout(timeout);

  55.             try {
  56.                 latch.await();
  57.             } catch (InterruptedException e) {
  58.                 abortTransaction = true;
  59.             }

  60.             boolean abortAction = timeoutManager.endTimeout();

  61.             if (abortAction) {
  62.                 r = Result.newErrorResult(Error.error(ErrorCode.X_40502));

  63.                 endAction(r);

  64.                 break;
  65.             }

  66.             if (abortTransaction) {
  67.                 rollbackNoCheck(false);

  68.                 sessionContext.currentStatement = null;

  69.                 return Result.newErrorResult(Error.error(ErrorCode.X_40001));
  70.             }

  71.             database.txManager.beginActionResume(this);

  72.             // tempActionHistory.add("sql execute " + cs.sql + " " + actionTimestamp + " " + rowActionList.size());
  73.             sessionContext.setDynamicArguments(pvals);

  74.             r = cs.execute(this);

  75.             if (database.logger.getSqlEventLogLevel()
  76.                     >= SimpleLog.LOG_NORMAL) {
  77.                 database.logger.logStatementEvent(this, cs, pvals, r,
  78.                                                   SimpleLog.LOG_NORMAL);
  79.             }

  80.             lockStatement = sessionContext.currentStatement;

  81.             // tempActionHistory.add("sql execute end " + actionTimestamp + " " + rowActionList.size());
  82.             endAction(r);

  83.             if (abortTransaction) {
  84.                 rollbackNoCheck(false);

  85.                 sessionContext.currentStatement = null;

  86.                 return Result.newErrorResult(Error.error(r.getException(),
  87.                         ErrorCode.X_40001, null));
  88.             }

  89.             if (redoAction) {
  90.                 redoAction = false;

  91.                 try {
  92.                     latch.await();
  93.                 } catch (InterruptedException e) {
  94.                     abortTransaction = true;
  95.                 }
  96.             } else {
  97.                 break;
  98.             }
  99.         }

  100.         if (sessionContext.depth == 0
  101.                 && (sessionContext.isAutoCommit.booleanValue()
  102.                     || cs.isAutoCommitStatement())) {
  103.             try {
  104.                 if (r.mode == ResultConstants.ERROR) {
  105.                     rollbackNoCheck(false);
  106.                 } else {
  107.                     commit(false);
  108.                 }
  109.             } catch (Exception e) {
  110.                 sessionContext.currentStatement = null;

  111.                 return Result.newErrorResult(Error.error(ErrorCode.X_40001,
  112.                         e));
  113.             }
  114.         }

  115.         sessionContext.currentStatement = null;

  116.         return r;
  117.     }
又是一个好长的函数~
我们忽略掉其中的错误检查部分,看函数的主体内容,其实就一个:

  1. r = cs.execute(this);
其它代码主要都在处理事务的东西。
而cs.execute则是一个抽象方法

点击(此处)折叠或打开

  1. public abstract Result execute(Session session);
它的具体代码,由Statement的子类来实现。

代码分析到现在,我们来理清一下命令的处理过程:
1. 大多数命令,都会进入Session.execute(Result)进行处理
2. 在Session.execute中,根据命令的类型,进行分别的处理
3. 对于Execute的命令(如DirectExecute),需要Statement对象来执行
4. Statement由parser编译而来——编译器根据语句内容,生成了不同的Statement子类的实例
5. 执行Statement.execute(Session)方法,返回Result对象

理清命令处理的思路之后,再回头来看Session.execute(Result cmd)的其它命令的处理,过程就变得清晰一点了。
其中BATCHEXECDIRECT和EXECDIRECT是类似的,比较让人困惑的是EXECUTE

点击(此处)折叠或打开

  1. case ResultConstants.EXECUTE : {
  2.                 int maxRows = cmd.getUpdateCount();

  3.                 if (maxRows == -1) {
  4.                     sessionContext.currentMaxRows = 0;
  5.                 } else {
  6.                     sessionContext.currentMaxRows = maxRows;
  7.                 }

  8.                 Statement cs = cmd.statement;

  9.                 if (cs == null
  10.                         || cs.compileTimestamp
  11.                            < database.schemaManager.schemaChangeTimestamp) {
  12.                     long csid = cmd.getStatementID();

  13.                     cs = statementManager.getStatement(this, csid);

  14.                     cmd.setStatement(cs);

  15.                     if (cs == null) {

  16.                         // invalid sql has been removed already
  17.                         return Result.newErrorResult(
  18.                             Error.error(ErrorCode.X_07502));
  19.                     }
  20.                 }

  21.                 Object[] pvals = (Object[]) cmd.valueData;
  22.                 Result result = executeCompiledStatement(cs, pvals,
  23.                     cmd.queryTimeout);

  24.                 result = performPostExecute(cmd, result);

  25.                 return result;
  26.             }
我在之前看这个代码的时候也很困惑,它的Statement居然是根据getStatementID取出来的。
而且在之前通信协议的解析中[Result.newResult]

点击(此处)折叠或打开

  1. case ResultConstants.EXECUTE :
  2.                 result.updateCount = in.readInt();
  3.                 result.fetchSize = in.readInt();
  4.                 result.statementID = in.readLong();
  5.                 result.rsProperties = in.readByte();
  6.                 result.queryTimeout = in.readShort();

  7.                 Statement statement =
  8.                     session.statementManager.getStatement(session,
  9.                         result.statementID);

  10.                 if (statement == null) {

  11.                     // invalid statement
  12.                     result.mode = ResultConstants.EXECUTE_INVALID;
  13.                     result.valueData = ValuePool.emptyObjectArray;

  14.                     break;
  15.                 }

  16.                 result.statement = statement;
  17.                 result.metaData = result.statement.getParametersMetaData();
  18.                 result.valueData = readSimple(in, result.metaData);
  19.                 break;
这里也是通过statementManager.getStatement取出来的。那这个Statement是什么时候存进去的呢?
秘密就在于Prepare

点击(此处)折叠或打开

  1. case ResultConstants.PREPARE : {
  2.                 Statement cs;

  3.                 try {
  4.                     cs = statementManager.compile(this, cmd);
  5.                 } catch (Throwable t) {
  6.                     String errorString = cmd.getMainString();

  7.                     if (database.getProperties().getErrorLevel()
  8.                             == HsqlDatabaseProperties.NO_MESSAGE) {
  9.                         errorString = null;
  10.                     }

  11.                     return Result.newErrorResult(t, errorString);
  12.                 }

  13.                 Result result = Result.newPrepareResponse(cs);

  14.                 if (cs.getType() == StatementTypes.SELECT_CURSOR
  15.                         || cs.getType() == StatementTypes.CALL) {
  16.                     sessionData.setResultSetProperties(cmd, result);
  17.                 }

  18.                 result = performPostExecute(cmd, result);

  19.                 return result;
  20.             }
也就是在这里,hsql将sql进行了预编译,然后生成一个statementId返回给客户端,客户端在下次请求执行的时候,必须将这个statementId带上,然后hsql从statementManager中取出之前编译好的sql语句(即statement对象)。

所以,我们知道,为什么对于需要多次执行的sql语句,为什么prepare的性能要高一些,因为executeDirect的语句需要每次编译,而prepare的sql语句只需要编译一次。

这里我们就只解释这些命令了,其它命令相对比较简单,就不逐一进行说明了,有兴趣的自己查看源码。

好,本文就到这,下回我们分析sql语句的编译过程。

[相关源码文件]
Result.java
Session.java
StatementManager.java











阅读(2549) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~