Implement thread enqueueing and synchronisation to facilitate binlog group commit. Introduce a queue of transactions waiting to commit to the binary log, protected by a short-time mutex LOCK_queue. While waiting for LOCK_log to become available, many transactions can queue up, and once the LOCK_log becomes available, the first thread in the queue will do a group commit for all of them, amortising the cost of fsync() across several transactions. Introduce a condition variable COND_queue, and add data to binlog_trx_data, to facilitate communication between the thread doing the group commit and the other participating threads. Move things around a bit so that the group commit (which is done in a single thread for multiple transactions) does not need the THD; we want to avoid thread-specific stuff in that loop, to minimise the risk that code is introduced that uses thread local storage (and thus ends up getting the data for the wrong thread). --- sql/log.cc | 437 ++++++++++++++++++++++++++++++++++++++++++++++--------------- sql/log.h | 26 +++ 2 files changed, 353 insertions(+), 110 deletions(-) Index: work-5.1-groupcommit/sql/log.cc =================================================================== --- work-5.1-groupcommit.orig/sql/log.cc 2010-06-09 11:54:22.000000000 +0200 +++ work-5.1-groupcommit/sql/log.cc 2010-06-09 11:54:26.000000000 +0200 @@ -258,12 +258,26 @@ public: */ my_off_t before_stmt_pos; - /* Which type of event to end a binlogged transaction with. */ - enum { - COMMIT_EVENT_COMMIT, COMMIT_EVENT_ROLLBACK, COMMIT_EVENT_XID - } commit_event_kind; - /* The xid value for COMMIT_EVENT_XID. */ - my_xid xid; + /* 0 or error when writing to binlog; set during group commit. */ + int error; + /* If error != 0, value of errno (for my_error() reporting). */ + int commit_errno; + /* Link for queueing transactions up for group commit to binlog. */ + binlog_trx_data *next; + /* + Flag set true when group commit for this transaction is finished; used by + each transaction with pthread_cond_wait() to wait until commit is done. + This flag is protected by the binlog-global LOCK_queue. + */ + bool done; + /* + Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + written during group commit. The incident_event is only valid if + has_incident() is true. + */ + Log_event *begin_event; + Log_event *end_event; + Log_event *incident_event; }; handlerton *binlog_hton; @@ -1408,7 +1422,8 @@ binlog_flush_trx_cache_prepare(THD *thd) } static int -binlog_flush_trx_cache_log(THD *thd, binlog_trx_data *trx_data) +binlog_flush_trx_cache_log(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) { IO_CACHE *trans_log= &trx_data->trans_log; /* @@ -1421,7 +1436,7 @@ binlog_flush_trx_cache_log(THD *thd, bin were, we would have to ensure that we're not ending a statement inside a stored function. */ - int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data); + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); trx_data->reset(); @@ -1455,7 +1470,8 @@ binlog_flush_trx_cache_log(THD *thd, bin contain updates to non-transactional tables. */ static int -binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data) +binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) { DBUG_ENTER("binlog_flush_trx_cache"); DBUG_PRINT("info", ("thd->options={ %s%s}", @@ -1465,7 +1481,7 @@ binlog_flush_trx_cache(THD *thd, binlog_ if (binlog_flush_trx_cache_prepare(thd)) DBUG_RETURN(1); - int error= binlog_flush_trx_cache_log(thd, trx_data); + int error= binlog_flush_trx_cache_log(thd, trx_data, end_ev); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); @@ -1518,7 +1534,7 @@ binlog_truncate_trx_cache(THD *thd, binl if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) { if (trx_data->has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); + error= mysql_bin_log.write_incident(thd); trx_data->reset(); } else // ...statement @@ -1584,8 +1600,8 @@ static int binlog_commit(handlerton *hto !stmt_has_updated_trans_table(thd) && thd->transaction.stmt.modified_non_trans_table)) { - trx_data->commit_event_kind= binlog_trx_data::COMMIT_EVENT_COMMIT; - error= binlog_flush_trx_cache(thd, trx_data); + Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1669,8 +1685,8 @@ static int binlog_rollback(handlerton *h thd->current_stmt_binlog_row_based) || ((thd->options & OPTION_KEEP_LOG))) { - trx_data->commit_event_kind= binlog_trx_data::COMMIT_EVENT_ROLLBACK; - error= binlog_flush_trx_cache(thd, trx_data); + Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev); } /* Otherwise, we simply truncate the cache as there is no change on @@ -2520,7 +2536,9 @@ void MYSQL_BIN_LOG::cleanup() delete description_event_for_exec; (void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_index); + (void) pthread_mutex_destroy(&LOCK_queue); (void) pthread_cond_destroy(&update_cond); + (void) pthread_cond_destroy(&COND_queue); } DBUG_VOID_RETURN; } @@ -2548,7 +2566,10 @@ void MYSQL_BIN_LOG::init_pthread_objects */ (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", MYF_NO_DEADLOCK_DETECTION); + (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue", + MYF(0)); (void) pthread_cond_init(&update_cond, 0); + (void) pthread_cond_init(&COND_queue, 0); } @@ -4734,26 +4755,25 @@ int query_error_code(THD *thd, bool not_ return error; } -bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + +bool MYSQL_BIN_LOG::write_incident(THD *thd) { uint error= 0; DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); - LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; Incident incident= INCIDENT_LOST_EVENTS; Incident_log_event ev(thd, incident, write_error_msg); - if (lock) - pthread_mutex_lock(&LOCK_log); + + pthread_mutex_lock(&LOCK_log); error= ev.write(&log_file); - if (lock) + if (!error && !(error= flush_and_sync())) { - if (!error && !(error= flush_and_sync())) - { - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - pthread_mutex_unlock(&LOCK_log); + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); } @@ -4782,67 +4802,241 @@ bool MYSQL_BIN_LOG::write_incident(THD * */ bool -MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data) +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) +{ + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + + /* + Create the necessary events here, where we have the correct THD (and + thread context). + + Due to group commit the actual writing to binlog may happen in a different + thread. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + trx_data->begin_event= &qinfo; + trx_data->end_event= end_ev; + if (trx_data->has_incident()) + { + Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); + trx_data->incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + } + else + { + trx_data->incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + } +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) +{ + /* + To facilitate group commit for the binlog, we first queue up ourselves in + the group commit queue. Then the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. + */ + pthread_mutex_lock(&LOCK_queue); + const binlog_trx_data *orig_queue= group_commit_queue; + trx_data->done= false; + enqueue_trx(trx_data); + + if (orig_queue != NULL) + return trx_group_commit_participant(trx_data); + else + return trx_group_commit_leader(trx_data); +} + +/* + Participate as secondary transaction in group commit. + + Another thread is already waiting to obtain the LOCK_log, and should include + this thread in the group commit once the log is obtained. So here we put + ourself in the queue and wait to be signalled that the group commit is done. + + Note that this function must be called with the LOCK_queue locked; the mutex + will be released before return. +*/ +bool +MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data) +{ + safe_mutex_assert_owner(&LOCK_queue); + + /* + Wait until trx_data.done == true. + Note that the condition is guarded by LOCK_queue. + */ + do + { + pthread_cond_wait(&COND_queue, &LOCK_queue); + } while (!trx_data->done); + + pthread_mutex_unlock(&LOCK_queue); + + return MYSQL_BIN_LOG::trx_group_commit_finish(trx_data); +} + +bool +MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) +{ + if (trx_data->error) + { + switch (trx_data->error) + { + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + trx_data->trans_log.file_name, trx_data->commit_errno); + break; + default: + /* + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(trx_data->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), trx_data->error); + } + + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (trx_data->end_event->get_type_code() == XID_EVENT) + mark_xid_done(); + + return 1; + } + + return 0; +} + +/* + Do binlog group commit as the lead thread. + + This must be called when this thread/transaction is queued at the start of + the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group + commit all the transactions in the queue (more may have entered while waiting + for LOCK_log). After commit is done, all other threads in the queue will be + signalled. + + Note that this function must be called with the LOCK_queue locked; the mutex + will be released before return. + */ +bool +MYSQL_BIN_LOG::trx_group_commit_leader(binlog_trx_data *our_trx_data) { uint xid_count= 0; uint write_count= 0; - int error; - bool got_error= false; - DBUG_ENTER("MYSQL_BIN_LOG::write"); + safe_mutex_assert_owner(&LOCK_queue); + + /* + Release the lock on the queue so that more threads can enter the queue + while we wait to obtain LOCK_log. + */ + VOID(pthread_mutex_unlock(&LOCK_queue)); VOID(pthread_mutex_lock(&LOCK_log)); + /* + Get the queue of transactions waiting to be committed. Note that the list + is in reverse order, with ourself at the end. So reverse the list in + preparation for committing the transactions in the correct order. + */ + VOID(pthread_mutex_lock(&LOCK_queue)); + binlog_trx_data *queue= get_trx_queue(); + VOID(pthread_mutex_unlock(&LOCK_queue)); + + DBUG_ASSERT(queue != NULL); + + binlog_trx_data *current= queue; + queue= NULL; + while (current) + { + current->error= 0; + binlog_trx_data *next= current->next; + current->next= queue; + queue= current; + current= next; + } + + /* + Now we have in QUEUE the list of transactions to be committed in order, + with ourself at the head. + */ + DBUG_ASSERT(queue == our_trx_data); + DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { - IO_CACHE *cache= &trx_data->trans_log; - write_error= 0; /* - We only bother to write to the binary log if there is anything - to write. - */ - if (my_b_tell(cache) > 0) + Commit every transaction in the queue. + + Note that we are doing this in a different thread than the one running + the transaction! So we are limited in the operations we can do. In + particular, we cannot call my_error() on behalf of a transaction, as + that obtains the THD from thread local storage. Instead, we must set + current->error and let the thread do the error reporting itself once + we wake it up. + */ + for (current= queue; current != NULL; current= current->next) { - error= write_transaction(thd, trx_data); - if (error) + IO_CACHE *cache= ¤t->trans_log; + + /* + We only bother to write to the binary log if there is anything + to write. + */ + if (my_b_tell(cache) > 0) { - got_error= 1; - if (!write_error) - { - write_error= 1; - my_error(error, MYF(ME_NOREFRESH), name, errno); - } + current->error= write_transaction(current); + if (current->error) + current->commit_errno= errno; + + write_count++; + if (current->end_event->get_type_code() == XID_EVENT) + xid_count++; } - - write_count++; - if (trx_data->commit_event_kind == binlog_trx_data::COMMIT_EVENT_XID) - xid_count++; } if (write_count > 0) { if (flush_and_sync()) - goto err; - - signal_update(); - - if (got_error) - goto err; + { + for (current= queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } + } + else + { + signal_update(); + } /* if any commit_events are Xid_log_event, increase the number of prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. - If no Xid_log_event (then it's all Query_log_event) rotate binlog, + If no Xid_log_events (then it's all Query_log_event) rotate binlog, if necessary. */ if (xid_count > 0) { - pthread_mutex_lock(&LOCK_prep_xids); - prepared_xids+= xid_count; - pthread_mutex_unlock(&LOCK_prep_xids); + mark_xids_active(xid_count); } else rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); @@ -4851,30 +5045,28 @@ MYSQL_BIN_LOG::write_transaction_to_binl VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(0); - -err: - if (!write_error) + VOID(pthread_mutex_lock(&LOCK_queue)); + /* Signal everyone except the first entry, which is ourself. */ + for (current= queue->next; current != NULL; current= current->next) { - write_error= 1; - my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno); + current->done= true; } - VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(1); + pthread_cond_broadcast(&COND_queue); + VOID(pthread_mutex_unlock(&LOCK_queue)); + + return trx_group_commit_finish(our_trx_data); } int -MYSQL_BIN_LOG::write_transaction(THD *thd, binlog_trx_data *trx_data) +MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) { IO_CACHE *cache= &trx_data->trans_log; /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + Log "BEGIN" at the beginning of every transaction. Here, a transaction is + either a BEGIN..COMMIT block or a single statement in autocommit mode. The + event was constructed in write_transaction_to_binlog(), in the thread + running the transaction. - /* Now this Query_log_event has artificial log_pos 0. It must be adjusted to reflect the real position in the log. Not doing it would confuse the slave: it would prevent this one from @@ -4882,7 +5074,7 @@ MYSQL_BIN_LOG::write_transaction(THD *th in wrong positions being shown to the user, MASTER_POS_WAIT undue waiting etc. */ - if (qinfo.write(&log_file)) + if (trx_data->begin_event->write(&log_file)) return ER_ERROR_ON_WRITE; DBUG_EXECUTE_IF("crash_before_writing_xid", @@ -4899,35 +5091,10 @@ MYSQL_BIN_LOG::write_transaction(THD *th if (write_cache(cache)) return ER_ERROR_ON_WRITE; - switch(trx_data->commit_event_kind) - { - case binlog_trx_data::COMMIT_EVENT_COMMIT: - { - Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - if (end_ev.write(&log_file)) - return ER_ERROR_ON_WRITE; - break; - } - case binlog_trx_data::COMMIT_EVENT_ROLLBACK: - { - Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - if (end_ev.write(&log_file)) - return ER_ERROR_ON_WRITE; - break; - } - case binlog_trx_data::COMMIT_EVENT_XID: - { - Xid_log_event end_ev(thd, trx_data->xid); - if (end_ev.write(&log_file)) - return ER_ERROR_ON_WRITE; - break; - } - default: - DBUG_ASSERT(false /* Illegal COMMIT_EVENT_XXX value */); - break; - } + if (trx_data->end_event->write(&log_file)) + return ER_ERROR_ON_WRITE; - if (trx_data->has_incident() && write_incident(thd, FALSE)) + if (trx_data->has_incident() && trx_data->incident_event->write(&log_file)) return ER_ERROR_ON_WRITE; if (cache->error) // Error on read @@ -4936,6 +5103,30 @@ MYSQL_BIN_LOG::write_transaction(THD *th return 0; } +void +MYSQL_BIN_LOG::enqueue_trx(binlog_trx_data *trx_data) +{ + binlog_trx_data *head; + + safe_mutex_assert_owner(&LOCK_queue); + + trx_data->next= group_commit_queue; + group_commit_queue= trx_data; +} + +binlog_trx_data * +MYSQL_BIN_LOG::get_trx_queue() +{ + binlog_trx_data *queue; + + safe_mutex_assert_owner(&LOCK_queue); + + queue= group_commit_queue; + group_commit_queue= NULL; + + return queue; +} + /** Wait until we get a signal that the binary log has been updated. @@ -5944,16 +6135,43 @@ int TC_LOG_BINLOG::log_xid(THD *thd, my_ DBUG_ENTER("TC_LOG_BINLOG::log"); binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - trx_data->commit_event_kind= binlog_trx_data::COMMIT_EVENT_XID; - trx_data->xid= xid; + Xid_log_event end_ev(thd, xid); + /* We always commit the entire transaction when writing an XID. Also note that the return value is inverted. */ - DBUG_RETURN(!binlog_flush_trx_cache_log(thd, trx_data)); + DBUG_RETURN(!binlog_flush_trx_cache_log(thd, trx_data, &end_ev)); } -void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +/* + After an XID is logged, we need to hold on to the current binlog file until + it is fully committed in the storage engine. The reason is that crash + recovery only looks at the latest binlog, so we must make sure there are no + outstanding prepared (but not committed) transactions before rotating the + binlog. + + To handle this, we keep a count of outstanding XIDs. This function is used + to increase this count when committing one or more transactions to the + binary log. +*/ +void +TC_LOG_BINLOG::mark_xids_active(uint xid_count) +{ + pthread_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + pthread_mutex_unlock(&LOCK_prep_xids); +} + +/* + Once an XID is committed, it is safe to rotate the binary log, as it can no + longer be needed during crash recovery. + + This function is called to mark an XID this way. It needs to decrease the + count of pending XIDs, and signal the log rotator thread when it reaches zero. +*/ +void +TC_LOG_BINLOG::mark_xid_done() { pthread_mutex_lock(&LOCK_prep_xids); DBUG_ASSERT(prepared_xids > 0); @@ -5962,6 +6180,11 @@ void TC_LOG_BINLOG::unlog(ulong cookie, pthread_cond_signal(&COND_prep_xids); } pthread_mutex_unlock(&LOCK_prep_xids); +} + +void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + mark_xid_done(); rotate_and_purge(0); // as ::write_transaction_to_binlog() did not rotate } Index: work-5.1-groupcommit/sql/log.h =================================================================== --- work-5.1-groupcommit.orig/sql/log.h 2010-06-09 11:54:22.000000000 +0200 +++ work-5.1-groupcommit/sql/log.h 2010-06-09 11:54:26.000000000 +0200 @@ -234,8 +234,17 @@ class MYSQL_BIN_LOG: public TC_LOG, priv /* LOCK_log and LOCK_index are inited by init_pthread_objects() */ pthread_mutex_t LOCK_index; pthread_mutex_t LOCK_prep_xids; + /* + Mutex to protect the queue of transactions waiting to participate in group commit. + */ + pthread_mutex_t LOCK_queue; pthread_cond_t COND_prep_xids; pthread_cond_t update_cond; + /* + Condition used to signal threads participating in group commit that the + commit is done. Used together with LOCK_queue. + */ + pthread_cond_t COND_queue; ulonglong bytes_written; IO_CACHE index_file; char index_file_name[FN_REFLEN]; @@ -272,6 +281,8 @@ class MYSQL_BIN_LOG: public TC_LOG, priv In 5.0 it's 0 for relay logs too! */ bool no_auto_events; + /* Queue of transactions queued up to participate in group commit. */ + binlog_trx_data *group_commit_queue; int write_to_file(IO_CACHE *cache); /* @@ -281,7 +292,15 @@ class MYSQL_BIN_LOG: public TC_LOG, priv */ void new_file_without_locking(); void new_file_impl(bool need_lock); - int write_transaction(THD *thd, binlog_trx_data *trx_data); + int write_transaction(binlog_trx_data *trx_data); + bool write_transaction_to_binlog_events(binlog_trx_data *trx_data); + bool trx_group_commit_participant(binlog_trx_data *trx_data); + bool trx_group_commit_finish(binlog_trx_data *trx_data); + bool trx_group_commit_leader(binlog_trx_data *our_trx_data); + void enqueue_trx(binlog_trx_data *trx_data); + binlog_trx_data *get_trx_queue(); + void mark_xid_done(); + void mark_xids_active(uint xid_count); public: MYSQL_LOG::generate_name; @@ -354,8 +373,9 @@ public: void new_file(); bool write(Log_event* event_info); // binary log write - bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data); - bool write_incident(THD *thd, bool lock); + bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev); + bool write_incident(THD *thd); int write_cache(IO_CACHE *cache); void set_write_error(THD *thd);