Implement the prepare_ordered() and commit_ordered() semantics for handler and binlog. This patch adds the handler calls prepare_ordered() and commit_ordered(), which are guaranteed to be called in consistent commit order across all participating engines. It also adds group_log_xid() and makes binlog use it instead of log_xid() to do group commit and maintain same commit order as prepare_ordered() and commit_ordered. --- sql/handler.cc | 510 +++++++++++++++++++++++++++++++++++++++++++++++-------- sql/handler.h | 74 +++++++ sql/log.cc | 394 ++++++++++++++++++++++++++++-------------- sql/log.h | 59 +++++- sql/sql_class.cc | 2 sql/sql_class.h | 17 + 6 files changed, 847 insertions(+), 209 deletions(-) Index: work-5.1-groupcommit/sql/log.cc =================================================================== --- work-5.1-groupcommit.orig/sql/log.cc 2010-05-24 16:09:32.000000000 +0200 +++ work-5.1-groupcommit/sql/log.cc 2010-05-24 16:09:32.000000000 +0200 @@ -154,9 +154,12 @@ class binlog_trx_data { public: binlog_trx_data() : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0) { trans_log.end_of_file= max_binlog_cache_size; + (void) my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); + (void) pthread_cond_init(&COND_group_commit, 0); } ~binlog_trx_data() @@ -213,6 +216,7 @@ public: before_stmt_pos= MY_OFF_T_UNDEF; incident= FALSE; trans_log.end_of_file= max_binlog_cache_size; + using_xa= FALSE; DBUG_ASSERT(empty()); } @@ -265,12 +269,23 @@ public: /* Link for queueing transactions up for group commit to binlog. */ binlog_trx_data *next; /* - Flag set true when group commit for this transaction is finished; used by - each transaction with pthread_cond_wait() to wait until commit is done. - This flag is protected by the binlog-global LOCK_queue. + Flag set true when group commit for this transaction is finished; used + with pthread_cond_wait() to wait until commit is done. + This flag is protected by LOCK_group_commit. */ bool done; /* + Flag set if this transaction is the group commit leader that will handle + the actual writing to the binlog. + This flag is protected by LOCK_group_commit. + */ + bool group_commit_leader; + /* + Flag set true if this transaction is committed with log_xid() as part of + XA, false if not. + */ + bool using_xa; + /* Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be written during group commit. The incident_event is only valid if has_incident() is true. @@ -278,6 +293,9 @@ public: Log_event *begin_event; Log_event *end_event; Log_event *incident_event; + /* Mutex and condition for wakeup after group commit. */ + pthread_mutex_t LOCK_group_commit; + pthread_cond_t COND_group_commit; }; handlerton *binlog_hton; @@ -1421,22 +1439,10 @@ binlog_flush_trx_cache_prepare(THD *thd) return 0; } -static int -binlog_flush_trx_cache_log(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) +static void +binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) { IO_CACHE *trans_log= &trx_data->trans_log; - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); trx_data->reset(); @@ -1446,7 +1452,6 @@ binlog_flush_trx_cache_log(THD *thd, bin statistic_increment(binlog_cache_disk_use, &LOCK_status); trans_log->disk_writes= 0; } - return error; } /* @@ -1481,7 +1486,19 @@ binlog_flush_trx_cache(THD *thd, binlog_ if (binlog_flush_trx_cache_prepare(thd)) DBUG_RETURN(1); - int error= binlog_flush_trx_cache_log(thd, trx_data, end_ev); + /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); + + binlog_flush_trx_cache_finish(thd, trx_data); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); @@ -1544,9 +1561,58 @@ binlog_truncate_trx_cache(THD *thd, binl DBUG_RETURN(error); } +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + static int binlog_prepare(handlerton *hton, THD *thd, bool all) { - return binlog_flush_trx_cache_prepare(thd); + /* + If this prepare is for a single statement in the middle of a transactions, + not the actual transaction commit, then we do nothing. The real work is + only done later, in the prepare for making persistent changes. + */ + if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + return 0; + + binlog_trx_data *trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + trx_data->using_xa= TRUE; + + if (binlog_flush_trx_cache_prepare(thd)) + return 1; + + my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + if (!xid) + { + /* Skip logging this transaction, marked by setting end_event to NULL. */ + trx_data->end_event= NULL; + return 0; + } + + /* + Allocate the extra events that will be logged to the binlog in binlog group + commit. Use placement new to allocate them on the THD memroot, as they need + to remain live until log_xid() returns. + */ + size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event); + if (trx_data->has_incident()) + needed_size+= sizeof(Incident_log_event); + uchar *mem= (uchar *)thd->alloc(needed_size); + if (!mem) + return 1; + + trx_data->begin_event= new ((void *)mem) + Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + mem+= sizeof(Query_log_event); + + trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid); + + if (trx_data->has_incident()) + trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event))) + Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg); + + return 0; } /** @@ -1569,11 +1635,11 @@ static int binlog_commit(handlerton *hto binlog_trx_data *const trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - if (trx_data->empty()) + if (trx_data->using_xa) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); - DBUG_RETURN(0); + binlog_flush_trx_cache_finish(thd, trx_data); + DBUG_RETURN(error); } /* @@ -2521,6 +2587,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG() index_file_name[0] = 0; bzero((char*) &index_file, sizeof(index_file)); bzero((char*) &purge_index_file, sizeof(purge_index_file)); + use_group_log_xid= TRUE; } /* this is called only once */ @@ -2538,7 +2605,6 @@ void MYSQL_BIN_LOG::cleanup() (void) pthread_mutex_destroy(&LOCK_index); (void) pthread_mutex_destroy(&LOCK_queue); (void) pthread_cond_destroy(&update_cond); - (void) pthread_cond_destroy(&COND_queue); } DBUG_VOID_RETURN; } @@ -2569,7 +2635,6 @@ void MYSQL_BIN_LOG::init_pthread_objects (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue", MYF(0)); (void) pthread_cond_init(&update_cond, 0); - (void) pthread_cond_init(&COND_queue, 0); } @@ -4755,9 +4820,6 @@ int query_error_code(THD *thd, bool not_ return error; } -static LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; - bool MYSQL_BIN_LOG::write_incident(THD *thd) { uint error= 0; @@ -4840,15 +4902,24 @@ MYSQL_BIN_LOG::write_transaction_to_binl lock. Any other threads in the queue just wait for the first one to finish the commit and wake them up. */ - pthread_mutex_lock(&LOCK_queue); - const binlog_trx_data *orig_queue= group_commit_queue; - trx_data->done= false; - enqueue_trx(trx_data); + + pthread_mutex_lock(&trx_data->LOCK_group_commit); + const binlog_trx_data *orig_queue= atomic_enqueue_trx(trx_data); if (orig_queue != NULL) - return trx_group_commit_participant(trx_data); + { + trx_data->group_commit_leader= FALSE; + trx_data->done= FALSE; + trx_group_commit_participant(trx_data); + } else - return trx_group_commit_leader(trx_data); + { + trx_data->group_commit_leader= TRUE; + pthread_mutex_unlock(&trx_data->LOCK_group_commit); + trx_group_commit_leader(NULL); + } + + return trx_group_commit_finish(trx_data); } /* @@ -4858,26 +4929,19 @@ MYSQL_BIN_LOG::write_transaction_to_binl this thread in the group commit once the log is obtained. So here we put ourself in the queue and wait to be signalled that the group commit is done. - Note that this function must be called with the LOCK_queue locked; the mutex - will be released before return. + Note that this function must be called with the trs_data->LOCK_group_commit + locked; the mutex will be released before return. */ -bool +void MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data) { - safe_mutex_assert_owner(&LOCK_queue); - - /* - Wait until trx_data.done == true. - Note that the condition is guarded by LOCK_queue. - */ - do - { - pthread_cond_wait(&COND_queue, &LOCK_queue); - } while (!trx_data->done); - - pthread_mutex_unlock(&LOCK_queue); + safe_mutex_assert_owner(&trx_data->LOCK_group_commit); - return MYSQL_BIN_LOG::trx_group_commit_finish(trx_data); + /* Wait until trx_data.done == true and woken up by the leader. */ + while (!trx_data->done) + pthread_cond_wait(&trx_data->COND_group_commit, + &trx_data->LOCK_group_commit); + pthread_mutex_unlock(&trx_data->LOCK_group_commit); } bool @@ -4928,56 +4992,71 @@ MYSQL_BIN_LOG::trx_group_commit_finish(b for LOCK_log). After commit is done, all other threads in the queue will be signalled. - Note that this function must be called with the LOCK_queue locked; the mutex - will be released before return. */ -bool -MYSQL_BIN_LOG::trx_group_commit_leader(binlog_trx_data *our_trx_data) +void +MYSQL_BIN_LOG::trx_group_commit_leader(THD *first_thd) { uint xid_count= 0; uint write_count= 0; - safe_mutex_assert_owner(&LOCK_queue); + /* First, put anything from group_log_xid into the queue. */ + binlog_trx_data *full_queue= NULL; + binlog_trx_data **next_ptr= &full_queue; + for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered) + { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + /* Skip log_xid for transactions without xid, marked by NULL end_event. */ + if (!trx_data->end_event) + continue; + + trx_data->error= 0; + *next_ptr= trx_data; + next_ptr= &(trx_data->next); + } /* - Release the lock on the queue so that more threads can enter the queue - while we wait to obtain LOCK_log. + Next, lock the LOCK_log(), and once we get it, add any additional writes + that queued up while we were waiting. + + Note that if some writer not going through log_xid() comes in and gets the + LOCK_log before us, they will not be able to include us in their group + commit (and they are not able to handle ensuring same commit order between + us and participating transactional storage engines anyway). + + On the other hand, when we get the LOCK_log, we will be able to include + any non-trasactional writes that queued up in our group commit. This + should hopefully not be too big of a problem, as group commit is most + important for the transactional case anyway when durability (fsync) is + enabled. */ - VOID(pthread_mutex_unlock(&LOCK_queue)); VOID(pthread_mutex_lock(&LOCK_log)); /* - Get the queue of transactions waiting to be committed. Note that the list - is in reverse order, with ourself at the end. So reverse the list in - preparation for committing the transactions in the correct order. - */ - VOID(pthread_mutex_lock(&LOCK_queue)); - binlog_trx_data *queue= get_trx_queue(); - VOID(pthread_mutex_unlock(&LOCK_queue)); - - DBUG_ASSERT(queue != NULL); - - binlog_trx_data *current= queue; - queue= NULL; + As the queue is in reverse order of entering, reverse the queue as we add + it to the existing one. Note that there is no ordering defined between + transactional and non-transactional commits. + */ + binlog_trx_data *current= atomic_grab_trx_queue(); + binlog_trx_data *xtra_queue= NULL; while (current) { current->error= 0; binlog_trx_data *next= current->next; - current->next= queue; - queue= current; + current->next= xtra_queue; + xtra_queue= current; current= next; } + *next_ptr= xtra_queue; /* - Now we have in QUEUE the list of transactions to be committed in order, - with ourself at the head. + Now we have in full_queue the list of transactions to be committed in + order. */ - DBUG_ASSERT(queue == our_trx_data); - DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { - /* Commit every transaction in the queue. @@ -4988,7 +5067,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(b current->error and let the thread do the error reporting itself once we wake it up. */ - for (current= queue; current != NULL; current= current->next) + for (current= full_queue; current != NULL; current= current->next) { IO_CACHE *cache= ¤t->trans_log; @@ -5003,16 +5082,17 @@ MYSQL_BIN_LOG::trx_group_commit_leader(b current->commit_errno= errno; write_count++; - if (current->end_event->get_type_code() == XID_EVENT) - xid_count++; } + + if (current->end_event->get_type_code() == XID_EVENT) + xid_count++; } if (write_count > 0) { if (flush_and_sync()) { - for (current= queue; current != NULL; current= current->next) + for (current= full_queue; current != NULL; current= current->next) { if (!current->error) { @@ -5025,36 +5105,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(b { signal_update(); } + } - /* - if any commit_events are Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated - if there're prepared xids in it - see the comment in new_file() for - an explanation. - If no Xid_log_events (then it's all Query_log_event) rotate binlog, - if necessary. - */ - if (xid_count > 0) - { - mark_xids_active(xid_count); - } - else - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); + /* + if any commit_events are Xid_log_event, increase the number of + prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + if there're prepared xids in it - see the comment in new_file() for + an explanation. + If no Xid_log_events (then it's all Query_log_event) rotate binlog, + if necessary. + */ + if (xid_count > 0) + { + mark_xids_active(xid_count); } + else + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } VOID(pthread_mutex_unlock(&LOCK_log)); - VOID(pthread_mutex_lock(&LOCK_queue)); - /* Signal everyone except the first entry, which is ourself. */ - for (current= queue->next; current != NULL; current= current->next) + /* + Signal those that are not part of group_log_xid, and are not group leaders + running the queue. + + Since a group leader runs the queue itself if a group_log_xid does not get + to do it forst, such leader threads do not need wait or wakeup. + */ + for (current= xtra_queue; current != NULL; current= current->next) { - current->done= true; - } - pthread_cond_broadcast(&COND_queue); - VOID(pthread_mutex_unlock(&LOCK_queue)); + /* + Note that we need to take LOCK_group_commit even in the case of a leader! - return trx_group_commit_finish(our_trx_data); + Otherwise there is a race between setting and testing the + group_commit_leader flag. + */ + pthread_mutex_lock(¤t->LOCK_group_commit); + if (!current->group_commit_leader) + { + current->done= true; + pthread_cond_signal(¤t->COND_group_commit); + } + pthread_mutex_unlock(¤t->LOCK_group_commit); + } } int @@ -5103,27 +5196,29 @@ MYSQL_BIN_LOG::write_transaction(binlog_ return 0; } -void -MYSQL_BIN_LOG::enqueue_trx(binlog_trx_data *trx_data) +binlog_trx_data * +MYSQL_BIN_LOG::atomic_enqueue_trx(binlog_trx_data *trx_data) { - binlog_trx_data *head; - - safe_mutex_assert_owner(&LOCK_queue); - + my_atomic_rwlock_wrlock(&LOCK_queue); trx_data->next= group_commit_queue; - group_commit_queue= trx_data; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&trx_data->next), + trx_data)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue); + return trx_data->next; } binlog_trx_data * -MYSQL_BIN_LOG::get_trx_queue() +MYSQL_BIN_LOG::atomic_grab_trx_queue() { - binlog_trx_data *queue; - - safe_mutex_assert_owner(&LOCK_queue); - - queue= group_commit_queue; - group_commit_queue= NULL; - + my_atomic_rwlock_wrlock(&LOCK_queue); + binlog_trx_data *queue= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&queue), + NULL)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue); return queue; } @@ -6122,9 +6217,6 @@ void TC_LOG_BINLOG::close() } /** - @todo - group commit - @retval 0 error @retval @@ -6132,16 +6224,48 @@ void TC_LOG_BINLOG::close() */ int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) { - DBUG_ENTER("TC_LOG_BINLOG::log"); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - Xid_log_event end_ev(thd, xid); + int error; + DBUG_ENTER("TC_LOG_BINLOG::log_xid"); + + thd->next_commit_ordered= 0; + group_log_xid(thd); + if (thd->xid_error) + error= xid_delayed_error(thd); + else + error= 0; /* - We always commit the entire transaction when writing an XID. Also - note that the return value is inverted. - */ - DBUG_RETURN(!binlog_flush_trx_cache_log(thd, trx_data, &end_ev)); + Note that the return value is inverted: zero on failure, private non-zero + 'cookie' on success. + */ + DBUG_RETURN(!error); +} + +/* + Do a binlog log_xid() for a group of transactions, linked through + thd->next_commit_ordered. +*/ +void +TC_LOG_BINLOG::group_log_xid(THD *first_thd) +{ + DBUG_ENTER("TC_LOG_BINLOG::group_log_xid"); + trx_group_commit_leader(first_thd); + for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered) + { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + thd->xid_error= trx_data->error; + thd->xid_cookie= !trx_data->error; + } + DBUG_VOID_RETURN; +} + +int +TC_LOG_BINLOG::xid_delayed_error(THD *thd) +{ + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + return trx_group_commit_finish(trx_data); } /* @@ -6158,9 +6282,12 @@ int TC_LOG_BINLOG::log_xid(THD *thd, my_ void TC_LOG_BINLOG::mark_xids_active(uint xid_count) { + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); + DBUG_PRINT("info", ("xid_count=%u", xid_count)); pthread_mutex_lock(&LOCK_prep_xids); prepared_xids+= xid_count; pthread_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; } /* @@ -6173,6 +6300,7 @@ TC_LOG_BINLOG::mark_xids_active(uint xid void TC_LOG_BINLOG::mark_xid_done() { + DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); pthread_mutex_lock(&LOCK_prep_xids); DBUG_ASSERT(prepared_xids > 0); if (--prepared_xids == 0) { @@ -6180,12 +6308,16 @@ TC_LOG_BINLOG::mark_xid_done() pthread_cond_signal(&COND_prep_xids); } pthread_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { - mark_xid_done(); + DBUG_ENTER("TC_LOG_BINLOG::unlog"); + if (xid) + mark_xid_done(); rotate_and_purge(0); // as ::write_transaction_to_binlog() did not rotate + DBUG_VOID_RETURN; } int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) Index: work-5.1-groupcommit/sql/log.h =================================================================== --- work-5.1-groupcommit.orig/sql/log.h 2010-05-24 16:09:32.000000000 +0200 +++ work-5.1-groupcommit/sql/log.h 2010-05-24 16:09:32.000000000 +0200 @@ -28,13 +28,49 @@ class TC_LOG { public: int using_heuristic_recover(); - TC_LOG() {} + /* True if we should use group_log_xid(), false to use log_xid(). */ + bool use_group_log_xid; + + TC_LOG() : use_group_log_xid(0) {} virtual ~TC_LOG() {} virtual int open(const char *opt_name)=0; virtual void close()=0; virtual int log_xid(THD *thd, my_xid xid)=0; virtual void unlog(ulong cookie, my_xid xid)=0; + /* + If use_group_log_xid is true, then this method is used instead of + log_xid() to do logging of a group of transactions all at once. + + The transactions will be linked through THD::next_commit_ordered. + + Additionally, when this method is used instead of log_xid(), the order in + which handler->prepare_ordered() and handler->commit_ordered() are called + is guaranteed to be the same as the order of calls and THD list elements + for group_log_xid(). + + This can be used to efficiently implement group commit that at the same + time preserves the order of commits among handlers and TC (eg. to get same + commit order in InnoDB and binary log). + + For TCs that do not need this, it can be preferable to use plain log_xid() + instead, as it allows threads to run log_xid() in parallel with each + other. In contrast, group_log_xid() runs under a global mutex, so it is + guaranteed that only once call into it will be active at once. + + Since this call handles multiple threads/THDs at once, my_error() (and + other code that relies on thread local storage) cannot be used in this + method. Instead, in case of error, thd->xid_error should be set to the + error code, and xid_delayed_error() will be called later in the correct + thread context to actually report the error. + + In the success case, this method must set thd->xid_cookie for each thread + to the cookie that is normally returned from log_xid() (which must be + non-zero in the non-error case). + */ + virtual void group_log_xid(THD *first_thd) { DBUG_ASSERT(FALSE); } + /* Error reporting for group_log_xid(). */ + virtual int xid_delayed_error(THD *thd) { DBUG_ASSERT(FALSE); return 0; } }; class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging @@ -235,16 +271,13 @@ class MYSQL_BIN_LOG: public TC_LOG, priv pthread_mutex_t LOCK_index; pthread_mutex_t LOCK_prep_xids; /* - Mutex to protect the queue of transactions waiting to participate in group commit. + Mutex to protect the queue of transactions waiting to participate in group + commit. (Only used on platforms without native atomic operations). */ pthread_mutex_t LOCK_queue; + pthread_cond_t COND_prep_xids; pthread_cond_t update_cond; - /* - Condition used to signal threads participating in group commit that the - commit is done. Used together with LOCK_queue. - */ - pthread_cond_t COND_queue; ulonglong bytes_written; IO_CACHE index_file; char index_file_name[FN_REFLEN]; @@ -294,11 +327,10 @@ class MYSQL_BIN_LOG: public TC_LOG, priv void new_file_impl(bool need_lock); int write_transaction(binlog_trx_data *trx_data); bool write_transaction_to_binlog_events(binlog_trx_data *trx_data); - bool trx_group_commit_participant(binlog_trx_data *trx_data); - bool trx_group_commit_finish(binlog_trx_data *trx_data); - bool trx_group_commit_leader(binlog_trx_data *our_trx_data); - void enqueue_trx(binlog_trx_data *trx_data); - binlog_trx_data *get_trx_queue(); + void trx_group_commit_participant(binlog_trx_data *trx_data); + void trx_group_commit_leader(THD *first_thd); + binlog_trx_data *atomic_enqueue_trx(binlog_trx_data *trx_data); + binlog_trx_data *atomic_grab_trx_queue(); void mark_xid_done(); void mark_xids_active(uint xid_count); @@ -330,6 +362,8 @@ public: int open(const char *opt_name); void close(); int log_xid(THD *thd, my_xid xid); + int xid_delayed_error(THD *thd); + void group_log_xid(THD *first_thd); void unlog(ulong cookie, my_xid xid); int recover(IO_CACHE *log, Format_description_log_event *fdle); #if !defined(MYSQL_CLIENT) @@ -375,6 +409,7 @@ public: bool write(Log_event* event_info); // binary log write bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev); + bool trx_group_commit_finish(binlog_trx_data *trx_data); bool write_incident(THD *thd); int write_cache(IO_CACHE *cache); Index: work-5.1-groupcommit/sql/handler.cc =================================================================== --- work-5.1-groupcommit.orig/sql/handler.cc 2010-05-24 16:09:17.000000000 +0200 +++ work-5.1-groupcommit/sql/handler.cc 2010-05-24 18:10:54.000000000 +0200 @@ -76,6 +76,7 @@ TYPELIB tx_isolation_typelib= {array_ele static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; uint known_extensions_id= 0; +static int commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered); static plugin_ref ha_default_plugin(THD *thd) @@ -544,6 +545,26 @@ err: DBUG_RETURN(1); } +/* + This is a queue of THDs waiting for being group committed with + tc_log->group_log_xid(). +*/ +static THD *group_commit_queue; +/* + This mutex protects the group_commit_queue on platforms without native + atomic operations. + */ +static pthread_mutex_t LOCK_group_commit_queue; +/* This mutex is used to serialize calls to handler prepare_ordered methods. */ +static pthread_mutex_t LOCK_prepare_ordered; +/* This mutex is used to serialize calls to handler commit_ordered methods. */ +static pthread_mutex_t LOCK_commit_ordered; +/* This mutex is used to serialize calls to group_log_xid(). */ +static pthread_mutex_t LOCK_group_commit; +static pthread_cond_t COND_group_commit; + +static bool mutexes_inited= FALSE; + int ha_init() { int error= 0; @@ -557,6 +578,19 @@ int ha_init() */ opt_using_transactions= total_ha>(ulong)opt_bin_log; savepoint_alloc_size+= sizeof(SAVEPOINT); + + group_commit_queue= NULL; + my_pthread_mutex_init(&LOCK_group_commit_queue, MY_MUTEX_INIT_FAST, + "LOCK_group_commit_queue", MYF(0)); + my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_prepare_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_commit_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); + pthread_cond_init(&COND_group_commit, 0); + mutexes_inited= TRUE; + DBUG_RETURN(error); } @@ -574,6 +608,15 @@ int ha_end() if (ha_finish_errors()) error= 1; + if (mutexes_inited) + { + pthread_mutex_destroy(&LOCK_group_commit_queue); + pthread_mutex_destroy(&LOCK_prepare_ordered); + pthread_mutex_destroy(&LOCK_commit_ordered); + pthread_mutex_destroy(&LOCK_group_commit); + mutexes_inited= FALSE; + } + DBUG_RETURN(error); } @@ -1053,6 +1096,108 @@ ha_check_and_coalesce_trx_read_only(THD return rw_ha_count; } +/* + Atomically enqueue a THD at the head of the queue of threads waiting to + group commit, and return the previous head of the queue. +*/ +static THD * +enqueue_atomic(THD *thd) +{ + my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + thd->next_commit_ordered= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&thd->next_commit_ordered), + thd)) + ; + my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + return thd->next_commit_ordered; +} + +static THD * +atomic_grab_reverse_queue() +{ + my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + THD *queue= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&queue), + NULL)) + ; + my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + + /* + Since we enqueue at the head, the queue is actually in reverse order. + So reverse it back into correct commit order before returning. + */ + THD *prev= NULL; + while (queue) + { + THD *next= queue->next_commit_ordered; + queue->next_commit_ordered= prev; + prev= queue; + queue= next; + } + + return prev; +} + +static void +call_commit_ordered(Ha_trx_info *ha_info, THD *thd, bool all) +{ + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + ht->commit_ordered(ht, thd, all); + } +} + +static void +group_commit_wait_for_wakeup(THD *thd) +{ + pthread_mutex_lock(&thd->LOCK_commit_ordered); + while (!thd->group_commit_ready) + pthread_cond_wait(&thd->COND_commit_ordered, + &thd->LOCK_commit_ordered); + pthread_mutex_unlock(&thd->LOCK_commit_ordered); +} + +static void +group_commit_wakeup_other(THD *other_thd) +{ + pthread_mutex_lock(&other_thd->LOCK_commit_ordered); + other_thd->group_commit_ready= TRUE; + pthread_cond_signal(&other_thd->COND_commit_ordered); + pthread_mutex_unlock(&other_thd->LOCK_commit_ordered); +} + +static bool group_commit_queue_busy= 0; + +static void +group_commit_mark_queue_idle() +{ + pthread_mutex_lock(&LOCK_group_commit); + group_commit_queue_busy= FALSE; + pthread_cond_signal(&COND_group_commit); + pthread_mutex_unlock(&LOCK_group_commit); +} + +static void +group_commit_mark_queue_busy() +{ + safe_mutex_assert_owner(&LOCK_group_commit); + group_commit_queue_busy= TRUE; +} + +static void +group_commit_wait_queue_idle() +{ + /* Wait for any existing queue run to finish. */ + safe_mutex_assert_owner(&LOCK_group_commit); + while (group_commit_queue_busy) + pthread_cond_wait(&COND_group_commit, &LOCK_group_commit); +} + /** @retval @@ -1070,7 +1215,7 @@ ha_check_and_coalesce_trx_read_only(THD */ int ha_commit_trans(THD *thd, bool all) { - int error= 0, cookie= 0; + int error= 0; /* 'all' means that this is either an explicit commit issued by user, or an implicit commit issued by a DDL. @@ -1085,7 +1230,9 @@ int ha_commit_trans(THD *thd, bool all) */ bool is_real_trans= all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list; - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + bool need_prepare_ordered, need_commit_ordered; + bool need_enqueue; + my_xid xid; DBUG_ENTER("ha_commit_trans"); /* @@ -1118,85 +1265,277 @@ int ha_commit_trans(THD *thd, bool all) DBUG_RETURN(2); } #ifdef USING_TRANSACTIONS - if (ha_info) + if (!ha_info) { - uint rw_ha_count; - bool rw_trans; + /* Free resources and perform other cleanup even for 'empty' transactions. */ + if (is_real_trans) + thd->transaction.cleanup(); + DBUG_RETURN(0); + } - DBUG_EXECUTE_IF("crash_commit_before", abort();); + DBUG_EXECUTE_IF("crash_commit_before", abort();); - /* Close all cursors that can not survive COMMIT */ - if (is_real_trans) /* not a statement commit */ - thd->stmt_map.close_transient_cursors(); + /* Close all cursors that can not survive COMMIT */ + if (is_real_trans) /* not a statement commit */ + thd->stmt_map.close_transient_cursors(); - rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); - /* rw_trans is TRUE when we in a transaction changing data */ - rw_trans= is_real_trans && (rw_ha_count > 0); + uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); + /* rw_trans is TRUE when we in a transaction changing data */ + bool rw_trans= is_real_trans && (rw_ha_count > 0); - if (rw_trans && - wait_if_global_read_lock(thd, 0, 0)) - { - ha_rollback_trans(thd, all); - DBUG_RETURN(1); - } + if (rw_trans && + wait_if_global_read_lock(thd, 0, 0)) + { + ha_rollback_trans(thd, all); + DBUG_RETURN(1); + } + + if (rw_trans && + opt_readonly && + !(thd->security_ctx->master_access & SUPER_ACL) && + !thd->slave_thread) + { + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); + goto err; + } - if (rw_trans && - opt_readonly && - !(thd->security_ctx->master_access & SUPER_ACL) && - !thd->slave_thread) + if (trans->no_2pc || (rw_ha_count <= 1)) + { + error= ha_commit_one_phase(thd, all); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + need_prepare_ordered= FALSE; + need_commit_ordered= FALSE; + xid= thd->transaction.xid_state.xid.get_my_xid(); + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) + { + int err; + handlerton *ht= hi->ht(); + /* + Do not call two-phase commit if this particular + transaction is read-only. This allows for simpler + implementation in engines that are always read-only. + */ + if (! hi->is_trx_read_write()) + continue; + /* + Sic: we know that prepare() is not NULL since otherwise + trans->no_2pc would have been set. + */ + if ((err= ht->prepare(ht, thd, all))) + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + status_var_increment(thd->status_var.ha_prepare_count); + + if (err) + goto err; + + if (ht->prepare_ordered) + need_prepare_ordered= TRUE; + if (ht->commit_ordered) + need_commit_ordered= TRUE; + } + DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT();); + + if (!is_real_trans) + { + error= commit_one_phase_2(thd, all, FALSE); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + /* + We can optimise away some of the thread synchronisation that may not be + needed. + + If need_prepare_ordered, then we need to take LOCK_prepare_ordered. + + If (xid && use_group_log_xid), then we need to enqueue (and this must + be done under LOCK_prepare_ordered if we take that lock). + + Similarly, if (need_prepare_ordered && need_commit_ordered), then we + need to enqueue under the LOCK_prepare_ordered. + + If (xid && use_group_log_xid), then we need to take LOCK_group_commit. + + If need_commit_ordered, then we need to take LOCK_commit_ordered. + + Cases not covered by above can be skipped to optimise things a bit. + */ + need_enqueue= (xid && tc_log->use_group_log_xid) || + (need_prepare_ordered && need_commit_ordered); + + thd->group_commit_ready= FALSE; + thd->group_commit_all= all; + if (need_prepare_ordered) + { + pthread_mutex_lock(&LOCK_prepare_ordered); + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) { - my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); - ha_rollback_trans(thd, all); - error= 1; - goto end; + int err; + handlerton *ht= hi->ht(); + if (! hi->is_trx_read_write()) + continue; + if (ht->prepare_ordered && (err= ht->prepare_ordered(ht, thd, all))) + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + pthread_mutex_unlock(&LOCK_prepare_ordered); + goto err; + } } + } + if (need_enqueue) + { + THD *previous_queue= enqueue_atomic(thd); + thd->is_commit_ordered_leader= (previous_queue == NULL); + } + if (need_prepare_ordered) + pthread_mutex_unlock(&LOCK_prepare_ordered); - if (!trans->no_2pc && (rw_ha_count > 1)) + int cookie; + if (tc_log->use_group_log_xid) + { + if (thd->is_commit_ordered_leader) { - for (; ha_info && !error; ha_info= ha_info->next()) + pthread_mutex_lock(&LOCK_group_commit); + group_commit_wait_queue_idle(); + + THD *queue= atomic_grab_reverse_queue(); + /* The first in the queue is the leader. */ + DBUG_ASSERT(queue == thd); + + /* + This will set individual error codes in each thd->xid_error, as + well as set thd->xid_cookie for later unlog() call. + */ + tc_log->group_log_xid(queue); + + /* + Call commit_ordered methods for all transactions in the queue + (that did not get an error in group_log_xid()). + + We do this under an additional global LOCK_commit_ordered; this is + so that transactions that do not need 2-phase commit do not have + to wait for the potentially long duration of LOCK_group_commit. + */ + if (need_commit_ordered) { - int err; - handlerton *ht= ha_info->ht(); - /* - Do not call two-phase commit if this particular - transaction is read-only. This allows for simpler - implementation in engines that are always read-only. - */ - if (! ha_info->is_trx_read_write()) - continue; - /* - Sic: we know that prepare() is not NULL since otherwise - trans->no_2pc would have been set. - */ - if ((err= ht->prepare(ht, thd, all))) + pthread_mutex_lock(&LOCK_commit_ordered); + for (THD *thd2= queue; thd2 != NULL; thd2= thd2->next_commit_ordered) { - my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); - error= 1; + if (!queue->xid_error) + call_commit_ordered(ha_info, thd2, thd2->group_commit_all); } - status_var_increment(thd->status_var.ha_prepare_count); + pthread_mutex_unlock(&LOCK_commit_ordered); } - DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT();); - if (error || (is_real_trans && xid && - (error= !(cookie= tc_log->log_xid(thd, xid))))) - { - ha_rollback_trans(thd, all); - error= 1; - goto end; - } - DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT();); + pthread_mutex_unlock(&LOCK_group_commit); + + /* Wake up everyone except ourself. */ + while ((queue= queue->next_commit_ordered) != NULL) + group_commit_wakeup_other(queue); + } + else + { + /* If not leader, just wait until leader wakes us up. */ + group_commit_wait_for_wakeup(thd); + } + + /* + Now that we're back in our own thread context, do any delayed error + reporting. + */ + if (thd->xid_error) + { + tc_log->xid_delayed_error(thd); + goto err; + } + cookie= thd->xid_cookie; + /* The cookie must be non-zero in the non-error case. */ + DBUG_ASSERT(cookie); + } + else + { + if (xid) + cookie= tc_log->log_xid(thd, xid); + + if (!need_enqueue) + { + error= commit_one_phase_2(thd, all, TRUE); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + /* + We only get here to do correctly sequenced prepare_ordered and + commit_ordered() calls. + + In this case, we need to wait for the previous in queue to finish + commit_ordered before us to get the correct sequence. + */ + DBUG_ASSERT(need_prepare_ordered && need_commit_ordered); + + if (thd->is_commit_ordered_leader) + { + pthread_mutex_lock(&LOCK_group_commit); + group_commit_wait_queue_idle(); + THD *queue= atomic_grab_reverse_queue(); + /* + Mark the queue busy while we bounce it from one thread to the + next. + */ + group_commit_mark_queue_busy(); + pthread_mutex_unlock(&LOCK_group_commit); + + /* The first in the queue is the leader. */ + DBUG_ASSERT(queue == thd); } - error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0; - DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT();); + else + { + /* If not leader, just wait until previous thread wakes us up. */ + group_commit_wait_for_wakeup(thd); + } + + /* Only run commit_ordered() if log_xid was successful. */ if (cookie) - tc_log->unlog(cookie, xid); - DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); -end: - if (rw_trans) - start_waiting_global_read_lock(thd); + { + pthread_mutex_lock(&LOCK_commit_ordered); + call_commit_ordered(ha_info, thd, all); + pthread_mutex_unlock(&LOCK_commit_ordered); + } + + THD *next= thd->next_commit_ordered; + if (next) + group_commit_wakeup_other(next); + else + group_commit_mark_queue_idle(); + + if (!cookie) + goto err; } - /* Free resources and perform other cleanup even for 'empty' transactions. */ - else if (is_real_trans) - thd->transaction.cleanup(); + + DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT();); + + error= commit_one_phase_2(thd, all, FALSE) ? 2 : 0; + + DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT();); + DBUG_ASSERT(cookie); + tc_log->unlog(cookie, xid); + + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + + /* Come here if error and we need to rollback. */ +err: + if (!error) + error= 1; + ha_rollback_trans(thd, all); + +end: + if (rw_trans) + start_waiting_global_read_lock(thd); #endif /* USING_TRANSACTIONS */ DBUG_RETURN(error); } @@ -1207,6 +1546,17 @@ end: */ int ha_commit_one_phase(THD *thd, bool all) { + /* + When we come here, we did not call handler commit_ordered() methods in + ha_commit_trans() 2-phase commit, so we pass TRUE to do it in + commit_one_phase_2(). + */ + return commit_one_phase_2(thd, all, TRUE); +} + +static int +commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered) +{ int error=0; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; /* @@ -1218,10 +1568,40 @@ int ha_commit_one_phase(THD *thd, bool a */ bool is_real_trans=all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; - DBUG_ENTER("ha_commit_one_phase"); + DBUG_ENTER("commit_one_phase_2"); #ifdef USING_TRANSACTIONS if (ha_info) { + if (is_real_trans && do_commit_ordered) + { + /* + If we did not do it already, call any commit_ordered() method. + + Even though we do not need to keep any ordering with other threads + (as there is no prepare or log_xid for this commit), we still need to + do this under the LOCK_commit_ordered mutex to not run in parallel + with other commit_ordered calls. + */ + + bool locked= FALSE; + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) + { + handlerton *ht= hi->ht(); + if (ht->commit_ordered) + { + if (!locked) + { + pthread_mutex_lock(&LOCK_commit_ordered); + locked= 1; + } + ht->commit_ordered(ht, thd, all); + } + } + if (locked) + pthread_mutex_unlock(&LOCK_commit_ordered); + } + for (; ha_info; ha_info= ha_info_next) { int err; Index: work-5.1-groupcommit/sql/handler.h =================================================================== --- work-5.1-groupcommit.orig/sql/handler.h 2010-05-24 16:09:17.000000000 +0200 +++ work-5.1-groupcommit/sql/handler.h 2010-05-24 16:09:32.000000000 +0200 @@ -656,9 +656,81 @@ struct handlerton NOTE 'all' is also false in auto-commit mode where 'end of statement' and 'real commit' mean the same event. */ - int (*commit)(handlerton *hton, THD *thd, bool all); + int (*commit)(handlerton *hton, THD *thd, bool all); + /* + The commit_ordered() method is called prior to the commit() method, after + the transaction manager has decided to commit (not rollback) the + transaction. + + The calls to commit_ordered() in multiple parallel transactions is + guaranteed to happen in the same order in every participating + handler. This can be used to ensure the same commit order among multiple + handlers (eg. in table handler and binlog). So if transaction T1 calls + into commit_ordered() of handler A before T2, then T1 will also call + commit_ordered() of handler B before T2. + + The intension is that commit_ordered() should do the minimal amount of + work that needs to happen in consistent commit order among handlers. To + preserve ordering, calls need to be serialised on a global mutex, so + doing any time-consuming or blocking operations in commit_ordered() will + limit scalability. + + Handlers can rely on commit_ordered() calls being serialised (no two + calls can run in parallel, so no extra locking on the handler part is + required to ensure this). + + Note that commit_ordered() can be called from a different thread than the + one handling the transaction! So it can not do anything that depends on + thread local storage, in particular it can not call my_error() and + friends (instead it can store the error code and delay the call to + my_error() to the commit() method). + + Similarly, since commit_ordered() returns void, any return error code + must be saved and returned from the commit() method instead. + + commit_ordered() is called only when actually committing a transaction + (autocommit or not), not when ending a statement in the middle of a + transaction. + + The commit_ordered method is optional, and can be left unset if not + needed in a particular handler. + */ + void (*commit_ordered)(handlerton *hton, THD *thd, bool all); int (*rollback)(handlerton *hton, THD *thd, bool all); int (*prepare)(handlerton *hton, THD *thd, bool all); + /* + The prepare_ordered method is optional. If set, it will be called after + successful prepare() in all handlers participating in 2-phase commit. + + The calls to prepare_ordered() among multiple parallel transactions are + ordered consistently with calls to commit_ordered(). This means that + calls to prepare_ordered() effectively define the commit order, and that + each handler will see the same sequence of transactions calling into + prepare_ordered() and commit_ordered(). + + Thus, prepare_ordered() can be used to define commit order for handlers + that need to do this in the prepare step (like binlog). It can also be + used to release transactions locks early in an order consistent with the + order transactions will be eventually committed. + + Like commit_ordered(), prepare_ordered() calls are serialised to maintain + ordering, so the intension is that they should execute fast, with only + the minimal amount of work needed to define commit order. Handlers can + rely on this serialisation, and do not need to do any extra locking to + avoid two prepare_ordered() calls running in parallel. + + Unlike commit_ordered(), prepare_ordered() _is_ guaranteed to be called + in the context of the thread handling the rest of the transaction. + + Note that for user-level XA SQL commands, no consistent ordering among + prepare_ordered() and commit_ordered() is guaranteed (as that would + require blocking all other commits for an indefinite time). + + prepare_ordered() is called only when actually committing a transaction + (autocommit or not), not when ending a statement in the middle of a + transaction. + */ + int (*prepare_ordered)(handlerton *hton, THD *thd, bool all); int (*recover)(handlerton *hton, XID *xid_list, uint len); int (*commit_by_xid)(handlerton *hton, XID *xid); int (*rollback_by_xid)(handlerton *hton, XID *xid); Index: work-5.1-groupcommit/sql/sql_class.cc =================================================================== --- work-5.1-groupcommit.orig/sql/sql_class.cc 2010-05-24 16:09:32.000000000 +0200 +++ work-5.1-groupcommit/sql/sql_class.cc 2010-05-24 16:09:32.000000000 +0200 @@ -673,6 +673,8 @@ THD::THD() active_vio = 0; #endif pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_commit_ordered, 0); /* Variables with default values */ proc_info="login"; Index: work-5.1-groupcommit/sql/sql_class.h =================================================================== --- work-5.1-groupcommit.orig/sql/sql_class.h 2010-05-24 16:09:17.000000000 +0200 +++ work-5.1-groupcommit/sql/sql_class.h 2010-05-24 16:09:32.000000000 +0200 @@ -1438,6 +1438,23 @@ public: /* container for handler's private per-connection data */ Ha_data ha_data[MAX_HA]; + /* Mutex and condition for waking up threads after group commit. */ + pthread_mutex_t LOCK_commit_ordered; + pthread_cond_t COND_commit_ordered; + bool group_commit_ready; + /* Pointer for linking THDs into queue waiting for group commit. */ + THD *next_commit_ordered; + /* Flag set true for the first THD in the group commit queue. */ + bool is_commit_ordered_leader; + /* + The "all" parameter of commit(), to communicate it to the thread that + calls commit_ordered(). + */ + bool group_commit_all; + /* Set by TC_LOG::group_log_xid(), to return per-thd error and cookie. */ + int xid_error; + int xid_cookie; + #ifndef MYSQL_CLIENT int binlog_setup_trx_data();