Split binlog transaction write in preparation for group commit In preparation for binlog group commit, split the writing of a transaction to binlog into two functions: one for the work to do for each commit in the group, and another for the stuff that needs to be done only once for the whole group. --- sql/log.cc | 208 ++++++++++++++++++++++++++++++++++++++----------------------- sql/log.h | 4 - 2 files changed, 134 insertions(+), 78 deletions(-) Index: work-5.1-groupcommit/sql/log.cc =================================================================== --- work-5.1-groupcommit.orig/sql/log.cc 2010-05-05 11:03:45.000000000 +0200 +++ work-5.1-groupcommit/sql/log.cc 2010-05-05 11:19:08.000000000 +0200 @@ -257,6 +257,13 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* Which type of event to end a binlogged transaction with. */ + enum { + COMMIT_EVENT_COMMIT, COMMIT_EVENT_ROLLBACK, COMMIT_EVENT_XID + } commit_event_kind; + /* The xid value for COMMIT_EVENT_XID. */ + my_xid xid; }; handlerton *binlog_hton; @@ -1401,8 +1408,7 @@ binlog_flush_trx_cache_prepare(THD *thd) } static int -binlog_flush_trx_cache_log(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) +binlog_flush_trx_cache_log(THD *thd, binlog_trx_data *trx_data) { IO_CACHE *trans_log= &trx_data->trans_log; /* @@ -1415,7 +1421,7 @@ binlog_flush_trx_cache_log(THD *thd, bin were, we would have to ensure that we're not ending a statement inside a stored function. */ - int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data); trx_data->reset(); @@ -1449,11 +1455,9 @@ binlog_flush_trx_cache_log(THD *thd, bin contain updates to non-transactional tables. */ static int -binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) +binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data) { DBUG_ENTER("binlog_flush_trx_cache"); - DBUG_PRINT("enter", ("end_ev: 0x%lx", (long) end_ev)); DBUG_PRINT("info", ("thd->options={ %s%s}", FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_BEGIN))); @@ -1461,7 +1465,7 @@ binlog_flush_trx_cache(THD *thd, binlog_ if (binlog_flush_trx_cache_prepare(thd)) DBUG_RETURN(1); - int error= binlog_flush_trx_cache_log(thd, trx_data, end_ev); + int error= binlog_flush_trx_cache_log(thd, trx_data); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); @@ -1580,8 +1584,8 @@ static int binlog_commit(handlerton *hto !stmt_has_updated_trans_table(thd) && thd->transaction.stmt.modified_non_trans_table)) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &qev); + trx_data->commit_event_kind= binlog_trx_data::COMMIT_EVENT_COMMIT; + error= binlog_flush_trx_cache(thd, trx_data); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1665,8 +1669,8 @@ static int binlog_rollback(handlerton *h thd->current_stmt_binlog_row_based) || ((thd->options & OPTION_KEEP_LOG))) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &qev); + trx_data->commit_event_kind= binlog_trx_data::COMMIT_EVENT_ROLLBACK; + error= binlog_flush_trx_cache(thd, trx_data); } /* Otherwise, we simply truncate the cache as there is no change on @@ -4777,15 +4781,16 @@ bool MYSQL_BIN_LOG::write_incident(THD * 'cache' needs to be reinitialized after this functions returns. */ -bool MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, - binlog_trx_data *trx_data, - Log_event *commit_event) +bool +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data) { - DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); - VOID(pthread_mutex_lock(&LOCK_log)); + uint xid_count= 0; + uint write_count= 0; + int error; + bool got_error= false; + DBUG_ENTER("MYSQL_BIN_LOG::write"); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); + VOID(pthread_mutex_lock(&LOCK_log)); DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true @@ -4799,73 +4804,51 @@ bool MYSQL_BIN_LOG::write_transaction_to */ if (my_b_tell(cache) > 0) { - /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - - /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. - */ - if (qinfo.write(&log_file)) - goto err; + error= write_transaction(thd, trx_data); + if (error) + { + got_error= 1; + if (!write_error) + { + write_error= 1; + my_error(error, MYF(ME_NOREFRESH), name, errno); + } + } - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if (write_cache(cache)) - DBUG_PRINT("info", ("error writing binlog cache")); - else - flush_and_sync(); - - DBUG_PRINT("info", ("crashing before writing xid")); - abort(); - }); + write_count++; + if (trx_data->commit_event_kind == binlog_trx_data::COMMIT_EVENT_XID) + xid_count++; + } - if (write_cache(cache)) + if (write_count > 0) + { + if (flush_and_sync()) goto err; - if (commit_event && commit_event->write(&log_file)) - goto err; + signal_update(); - if (trx_data->has_incident() && write_incident(thd, FALSE)) + if (got_error) goto err; - if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT();); - if (cache->error) // Error on read + /* + if any commit_events are Xid_log_event, increase the number of + prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + if there're prepared xids in it - see the comment in new_file() for + an explanation. + If no Xid_log_event (then it's all Query_log_event) rotate binlog, + if necessary. + */ + if (xid_count > 0) { - my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + pthread_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + pthread_mutex_unlock(&LOCK_prep_xids); } - signal_update(); - } - - /* - if commit_event is Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated - if there're prepared xids in it - see the comment in new_file() for - an explanation. - If the commit_event is not Xid_log_event (then it's a Query_log_event) - rotate binlog, if necessary. - */ - if (commit_event && commit_event->get_type_code() == XID_EVENT) - { - pthread_mutex_lock(&LOCK_prep_xids); - prepared_xids++; - pthread_mutex_unlock(&LOCK_prep_xids); + else + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - else - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + VOID(pthread_mutex_unlock(&LOCK_log)); DBUG_RETURN(0); @@ -4880,6 +4863,78 @@ err: DBUG_RETURN(1); } +int +MYSQL_BIN_LOG::write_transaction(THD *thd, binlog_trx_data *trx_data) +{ + IO_CACHE *cache= &trx_data->trans_log; + /* + Log "BEGIN" at the beginning of every transaction. Here, a + transaction is either a BEGIN..COMMIT block or a single + statement in autocommit mode. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + + /* + Now this Query_log_event has artificial log_pos 0. It must be + adjusted to reflect the real position in the log. Not doing it + would confuse the slave: it would prevent this one from + knowing where he is in the master's binlog, which would result + in wrong positions being shown to the user, MASTER_POS_WAIT + undue waiting etc. + */ + if (qinfo.write(&log_file)) + return ER_ERROR_ON_WRITE; + + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(cache))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(); + + DBUG_PRINT("info", ("crashing before writing xid")); + abort(); + }); + + if (write_cache(cache)) + return ER_ERROR_ON_WRITE; + + switch(trx_data->commit_event_kind) + { + case binlog_trx_data::COMMIT_EVENT_COMMIT: + { + Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); + if (end_ev.write(&log_file)) + return ER_ERROR_ON_WRITE; + break; + } + case binlog_trx_data::COMMIT_EVENT_ROLLBACK: + { + Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); + if (end_ev.write(&log_file)) + return ER_ERROR_ON_WRITE; + break; + } + case binlog_trx_data::COMMIT_EVENT_XID: + { + Xid_log_event end_ev(thd, trx_data->xid); + if (end_ev.write(&log_file)) + return ER_ERROR_ON_WRITE; + break; + } + default: + DBUG_ASSERT(false /* Illegal COMMIT_EVENT_XXX value */); + break; + } + + if (trx_data->has_incident() && write_incident(thd, FALSE)) + return ER_ERROR_ON_WRITE; + + if (cache->error) // Error on read + return ER_ERROR_ON_READ; + + return 0; +} /** Wait until we get a signal that the binary log has been updated. @@ -5887,14 +5942,15 @@ void TC_LOG_BINLOG::close() int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) { DBUG_ENTER("TC_LOG_BINLOG::log"); - Xid_log_event xle(thd, xid); binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + trx_data->commit_event_kind= binlog_trx_data::COMMIT_EVENT_XID; + trx_data->xid= xid; /* We always commit the entire transaction when writing an XID. Also note that the return value is inverted. */ - DBUG_RETURN(!binlog_flush_trx_cache_log(thd, trx_data, &xle)); + DBUG_RETURN(!binlog_flush_trx_cache_log(thd, trx_data)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) Index: work-5.1-groupcommit/sql/log.h =================================================================== --- work-5.1-groupcommit.orig/sql/log.h 2010-05-05 10:59:13.000000000 +0200 +++ work-5.1-groupcommit/sql/log.h 2010-05-05 11:03:47.000000000 +0200 @@ -281,6 +281,7 @@ class MYSQL_BIN_LOG: public TC_LOG, priv */ void new_file_without_locking(); void new_file_impl(bool need_lock); + int write_transaction(THD *thd, binlog_trx_data *trx_data); public: MYSQL_LOG::generate_name; @@ -353,8 +354,7 @@ public: void new_file(); bool write(Log_event* event_info); // binary log write - bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, - Log_event *commit_event); + bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data); bool write_incident(THD *thd, bool lock); int write_cache(IO_CACHE *cache);