Move prepare_ordered()/commit_ordered() login into TC subclasses, preparing for TC plugins that can control commit order. This also cleans up the logic for the two different cases previously distinguished by the flag use_group_log_xid. Now this flag is gone, and instead we have two different abstract subclasses of TC_LOG, one for each case (TC_LOG_unordered for simple multi-threaded log_xid() vs. TC_LOG_group_commit for single-threaded group_log_xid()). This also allows to cleanup the code of each a bit, like not needing mark_queue_busy() and friends in class TC_LOG_group_commit, and generally simplification from not having everything inside ha_commit_trans(). Also move out most stuff from THD. We actually don't need it there, as we can stack-allocate a struct TC_group_commit_entry instead. We still keep a mutex and condition in THD, just so we don't have to re-initialise them at each commit (didn't know of another place to put such). There is also now a more generic TC interface TC_LOG() with a method log_and_order(). This method has the responsibility of calling prepare_ordered() and commit_ordered(). The idea is that something like Galera could implement this more general method in order to fully control commit order. --- sql/handler.cc | 393 +++------------------------------------------------- sql/handler.h | 37 +++- sql/log.cc | 421 ++++++++++++++++++++++++++++++++++++++++++++++++++------ sql/log.h | 175 ++++++++++++++++++++--- sql/mysqld.cc | 3 sql/sql_class.h | 11 - 6 files changed, 589 insertions(+), 451 deletions(-) Index: work-5.1-groupcommit/sql/log.cc =================================================================== --- work-5.1-groupcommit.orig/sql/log.cc 2010-08-24 09:11:00.000000000 +0200 +++ work-5.1-groupcommit/sql/log.cc 2010-08-24 12:31:52.000000000 +0200 @@ -2587,7 +2587,6 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG() index_file_name[0] = 0; bzero((char*) &index_file, sizeof(index_file)); bzero((char*) &purge_index_file, sizeof(purge_index_file)); - use_group_log_xid= TRUE; } /* this is called only once */ @@ -4947,6 +4946,8 @@ MYSQL_BIN_LOG::trx_group_commit_particip bool MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) { + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_finish"); + DBUG_PRINT("info", ("trx_data->error=%d\n", trx_data->error)); if (trx_data->error) { switch (trx_data->error) @@ -4977,10 +4978,10 @@ MYSQL_BIN_LOG::trx_group_commit_finish(b if (trx_data->end_event->get_type_code() == XID_EVENT) mark_xid_done(); - return 1; + DBUG_RETURN(1); } - return 0; + DBUG_RETURN(0); } /* @@ -4994,7 +4995,7 @@ MYSQL_BIN_LOG::trx_group_commit_finish(b */ void -MYSQL_BIN_LOG::trx_group_commit_leader(THD *first_thd) +MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) { uint xid_count= 0; uint write_count= 0; @@ -5002,10 +5003,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(T /* First, put anything from group_log_xid into the queue. */ binlog_trx_data *full_queue= NULL; binlog_trx_data **next_ptr= &full_queue; - for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered) + for (TC_group_commit_entry *entry= first; entry; entry= entry->next) { binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); /* Skip log_xid for transactions without xid, marked by NULL end_event. */ if (!trx_data->end_event) @@ -5614,6 +5615,369 @@ void sql_print_information(const char *f } +static my_bool mutexes_inited; +pthread_mutex_t LOCK_prepare_ordered; +pthread_mutex_t LOCK_commit_ordered; + +void +TC_init() +{ + my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_prepare_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_commit_ordered", MYF(0)); + mutexes_inited= TRUE; +} + +void +TC_destroy() +{ + if (mutexes_inited) + { + pthread_mutex_destroy(&LOCK_prepare_ordered); + pthread_mutex_destroy(&LOCK_commit_ordered); + mutexes_inited= FALSE; + } +} + +void +TC_LOG::run_prepare_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + safe_mutex_assert_owner(&LOCK_prepare_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->prepare_ordered) + continue; + ht->prepare_ordered(ht, thd, all); + } +} + +void +TC_LOG::run_commit_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + safe_mutex_assert_owner(&LOCK_commit_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + ht->commit_ordered(ht, thd, all); + } +} + +TC_LOG_queued::TC_LOG_queued() : group_commit_queue(NULL) +{ + my_pthread_mutex_init(&LOCK_queue_noatomics, MY_MUTEX_INIT_FAST, + "LOCK_queue_noatomics", MYF(0)); +} + +TC_LOG_queued::~TC_LOG_queued() +{ + pthread_mutex_destroy(&LOCK_queue_noatomics); +} + +/* + Atomically enqueue a THD at the head of the queue of threads waiting to + group commit, and return the previous head of the queue. +*/ +TC_LOG_queued::TC_group_commit_entry * +TC_LOG_queued::enqueue_atomic(TC_group_commit_entry *entry) +{ + my_atomic_rwlock_wrlock(&LOCK_queue_noatomics); + + void *orig_queue= group_commit_queue; + do + { + entry->next= (TC_group_commit_entry *)orig_queue; + } + while (!my_atomic_casptr(&group_commit_queue, &orig_queue, entry)); + + my_atomic_rwlock_wrunlock(&LOCK_queue_noatomics); + + return (TC_group_commit_entry *)orig_queue; +} + +TC_LOG_queued::TC_group_commit_entry * +TC_LOG_queued::atomic_grab_reverse_queue() +{ + my_atomic_rwlock_wrlock(&LOCK_queue_noatomics); + void *queue= group_commit_queue; + while (!my_atomic_casptr(&group_commit_queue, &queue, NULL)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue_noatomics); + + /* + Since we enqueue at the head, the queue is actually in reverse order. + So reverse it back into correct commit order before returning. + */ + TC_group_commit_entry *entry= (TC_group_commit_entry *)queue; + TC_group_commit_entry *prev= NULL; + while (entry) + { + TC_group_commit_entry *next= entry->next; + entry->next= prev; + prev= entry; + entry= next; + } + + return prev; +} + +void +TC_LOG_queued::group_commit_wait_for_wakeup(TC_group_commit_entry *entry) +{ + THD *thd= entry->thd; + pthread_mutex_lock(&thd->LOCK_commit_ordered); + while (!entry->group_commit_ready) + pthread_cond_wait(&thd->COND_commit_ordered, + &thd->LOCK_commit_ordered); + pthread_mutex_unlock(&thd->LOCK_commit_ordered); +} + +void +TC_LOG_queued::group_commit_wakeup_other(TC_group_commit_entry *other) +{ + THD *thd= other->thd; + pthread_mutex_lock(&thd->LOCK_commit_ordered); + other->group_commit_ready= TRUE; + pthread_cond_signal(&thd->COND_commit_ordered); + pthread_mutex_unlock(&thd->LOCK_commit_ordered); +} + +TC_LOG_unordered::TC_LOG_unordered() : group_commit_queue_busy(0) +{ + my_pthread_mutex_init(&LOCK_queue_busy, MY_MUTEX_INIT_SLOW, + "LOCK_queue_busy", MYF(0)); + pthread_cond_init(&COND_queue_busy, 0); +} + +TC_LOG_unordered::~TC_LOG_unordered() +{ + pthread_cond_destroy(&COND_queue_busy); + pthread_mutex_destroy(&LOCK_queue_busy); +} + +void +TC_LOG_unordered::group_commit_mark_queue_idle() +{ + pthread_mutex_lock(&LOCK_queue_busy); + group_commit_queue_busy= FALSE; + pthread_cond_signal(&COND_queue_busy); + pthread_mutex_unlock(&LOCK_queue_busy); +} + +void +TC_LOG_unordered::group_commit_mark_queue_busy() +{ + safe_mutex_assert_owner(&LOCK_queue_busy); + group_commit_queue_busy= TRUE; +} + +void +TC_LOG_unordered::group_commit_wait_queue_idle() +{ + /* Wait for any existing queue run to finish. */ + safe_mutex_assert_owner(&LOCK_queue_busy); + while (group_commit_queue_busy) + pthread_cond_wait(&COND_queue_busy, &LOCK_queue_busy); +} + +int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) +{ + int cookie; + struct TC_group_commit_entry entry; + bool is_group_commit_leader; + LINT_INIT(is_group_commit_leader); + + if (need_prepare_ordered) + { + pthread_mutex_lock(&LOCK_prepare_ordered); + run_prepare_ordered(thd, all); + if (need_commit_ordered) + { + /* + Must put us in queue so we can run_commit_ordered() in same sequence + as we did run_prepare_ordered(). + */ + entry.thd= thd; + entry.group_commit_ready= false; + TC_group_commit_entry *previous_queue= enqueue_atomic(&entry); + is_group_commit_leader= (previous_queue == NULL); + } + pthread_mutex_unlock(&LOCK_prepare_ordered); + } + + if (xid) + cookie= log_xid(thd, xid); + else + cookie= 0; + + if (need_commit_ordered) + { + if (need_prepare_ordered) + { + /* + We did the run_prepare_ordered() serialised, then ran the log_xid() in + parallel. Now we have to do run_commit_ordered() serialised in the + same sequence as run_prepare_ordered(). + + We do this starting from the head of the queue, each thread doing + run_commit_ordered() and signalling the next in queue. + */ + if (is_group_commit_leader) + { + /* The first in queue starts the ball rolling. */ + pthread_mutex_lock(&LOCK_queue_busy); + group_commit_wait_queue_idle(); + IF_DBUG(TC_group_commit_entry *queue=) atomic_grab_reverse_queue(); + DBUG_ASSERT(queue == &entry && queue->thd == thd); + /* + Mark the queue busy while we bounce it from one thread to the + next. + */ + group_commit_mark_queue_busy(); + pthread_mutex_unlock(&LOCK_queue_busy); + } + else + { + /* Not first in queue; just wait until previous thread wakes us up. */ + group_commit_wait_for_wakeup(&entry); + } + } + + /* Only run commit_ordered() if log_xid was successful. */ + if (cookie) + { + pthread_mutex_lock(&LOCK_commit_ordered); + run_commit_ordered(thd, all); + pthread_mutex_unlock(&LOCK_commit_ordered); + } + + if (need_prepare_ordered) + { + TC_group_commit_entry *next= entry.next; + if (next) + group_commit_wakeup_other(next); + else + group_commit_mark_queue_idle(); + } + } + + return cookie; +} + + +TC_LOG_group_commit::TC_LOG_group_commit() +{ + my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); +} + +TC_LOG_group_commit::~TC_LOG_group_commit() +{ + pthread_mutex_destroy(&LOCK_group_commit); +} + +int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) +{ + IF_DBUG(int err;) + int cookie; + struct TC_group_commit_entry entry; + bool is_group_commit_leader; + LINT_INIT(is_group_commit_leader); + + if (need_prepare_ordered) + { + pthread_mutex_lock(&LOCK_prepare_ordered); + run_prepare_ordered(thd, all); + } + + entry.thd= thd; + entry.all= all; + entry.group_commit_ready= false; + entry.xid_error= 0; + TC_group_commit_entry *previous_queue= enqueue_atomic(&entry); + is_group_commit_leader= (previous_queue == NULL); + + if (need_prepare_ordered) + pthread_mutex_unlock(&LOCK_prepare_ordered); + + if (is_group_commit_leader) + { + pthread_mutex_lock(&LOCK_group_commit); + + TC_group_commit_entry *queue= atomic_grab_reverse_queue(); + /* The first in the queue is the leader. */ + DBUG_ASSERT(queue == &entry && queue->thd == thd); + + /* This will set individual error codes in each thd->xid_error. */ + group_log_xid(queue); + + /* + Call commit_ordered methods for all transactions in the queue + (that did not get an error in group_log_xid()). + + We do this under an additional global LOCK_commit_ordered; this is + so that transactions that do not need 2-phase commit do not have + to wait for the potentially long duration of LOCK_group_commit. + */ + if (need_commit_ordered) + { + pthread_mutex_lock(&LOCK_commit_ordered); + for (TC_group_commit_entry *entry2= queue; + entry2 != NULL; + entry2= entry2->next) + { + if (!entry2->xid_error) + run_commit_ordered(entry2->thd, entry2->all); + } + pthread_mutex_unlock(&LOCK_commit_ordered); + } + pthread_mutex_unlock(&LOCK_group_commit); + + /* Wake up everyone except ourself. */ + TC_group_commit_entry *current= queue->next; + while (current != NULL) + { + /* + Careful not to access current->next_commit_ordered after waking up + the other thread! As it may change immediately after wakeup. + */ + TC_group_commit_entry *next= current->next; + group_commit_wakeup_other(current); + current= next; + } + } + else + { + /* If not leader, just wait until leader wakes us up. */ + group_commit_wait_for_wakeup(&entry); + } + + /* + Now that we're back in our own thread context, do any delayed processing + and error reporting. + */ + IF_DBUG(err= entry.xid_error;) + cookie= xid_log_after(&entry); + /* The cookie must be non-zero in the non-error case. */ + DBUG_ASSERT(err || cookie); + + return cookie; +} + + /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -6216,56 +6580,33 @@ void TC_LOG_BINLOG::close() pthread_cond_destroy (&COND_prep_xids); } -/** - @retval - 0 error - @retval - 1 success -*/ -int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) -{ - int error; - DBUG_ENTER("TC_LOG_BINLOG::log_xid"); - - thd->next_commit_ordered= 0; - group_log_xid(thd); - if (thd->xid_error) - error= xid_delayed_error(thd); - else - error= 0; - - /* - Note that the return value is inverted: zero on failure, private non-zero - 'cookie' on success. - */ - DBUG_RETURN(!error); -} - /* Do a binlog log_xid() for a group of transactions, linked through thd->next_commit_ordered. */ void -TC_LOG_BINLOG::group_log_xid(THD *first_thd) +TC_LOG_BINLOG::group_log_xid(TC_group_commit_entry *first) { DBUG_ENTER("TC_LOG_BINLOG::group_log_xid"); - trx_group_commit_leader(first_thd); - for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered) + trx_group_commit_leader(first); + for (TC_group_commit_entry *entry= first; entry; entry= entry->next) { binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - thd->xid_error= trx_data->error; - thd->xid_cookie= !trx_data->error; + (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); + entry->xid_error= trx_data->error; } DBUG_VOID_RETURN; } int -TC_LOG_BINLOG::xid_delayed_error(THD *thd) +TC_LOG_BINLOG::xid_log_after(TC_group_commit_entry *entry) { binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - return trx_group_commit_finish(trx_data); + (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); + if (trx_group_commit_finish(trx_data)) + return 0; // Returning zero cookie signals error + else + return 1; } /* Index: work-5.1-groupcommit/sql/log.h =================================================================== --- work-5.1-groupcommit.orig/sql/log.h 2010-08-24 09:11:00.000000000 +0200 +++ work-5.1-groupcommit/sql/log.h 2010-08-24 12:24:03.000000000 +0200 @@ -28,19 +28,142 @@ class TC_LOG { public: int using_heuristic_recover(); - /* True if we should use group_log_xid(), false to use log_xid(). */ - bool use_group_log_xid; - TC_LOG() : use_group_log_xid(0) {} + TC_LOG() {} virtual ~TC_LOG() {} virtual int open(const char *opt_name)=0; virtual void close()=0; - virtual int log_xid(THD *thd, my_xid xid)=0; + virtual int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) = 0; virtual void unlog(ulong cookie, my_xid xid)=0; + +protected: /* - If use_group_log_xid is true, then this method is used instead of - log_xid() to do logging of a group of transactions all at once. + These methods are meant to be invoked from log_and_order() implementations + to run any prepare_ordered() respectively commit_ordered() methods in + participating handlers. + + They must be called using suitable thread syncronisation to ensure that + they are each called in the correct commit order among all + transactions. However, it is only necessary to call them if the + corresponding flag passed to log_and_order is set (it is safe, but not + required, to call them when the flag is false). + + The caller must be holding LOCK_prepare_ordered respectively + LOCK_commit_ordered when calling these methods. + */ + void run_prepare_ordered(THD *thd, bool all); + void run_commit_ordered(THD *thd, bool all); +}; + +/* + Locks used to ensure serialised execution of TC_LOG::run_prepare_ordered() + and TC_LOG::run_commit_ordered(), or any other code that calls handler + prepare_ordered() or commit_ordered() methods. +*/ +extern pthread_mutex_t LOCK_prepare_ordered; +extern pthread_mutex_t LOCK_commit_ordered; + +extern void TC_init(); +extern void TC_destroy(); + +/* + Base class for two TC implementations TC_LOG_unordered and + TC_LOG_group_commit that both use a queue of threads waiting for group + commit. +*/ +class TC_LOG_queued: public TC_LOG +{ +protected: + TC_LOG_queued(); + ~TC_LOG_queued(); + + /* Structure used to link list of THDs waiting for group commit. */ + struct TC_group_commit_entry + { + struct TC_group_commit_entry *next; + THD *thd; + /* This is the `all' parameter for ha_commit_trans() etc. */ + bool all; + /* + Flag set true when it is time for this thread to wake up after group + commit. Used with THD::LOCK_commit_ordered and THD::COND_commit_ordered. + */ + bool group_commit_ready; + /* + Set by TC_LOG_group_commit::group_log_xid(), to return per-thd error and + cookie. + */ + int xid_error; + }; + + TC_group_commit_entry * enqueue_atomic(TC_group_commit_entry *entry); + TC_group_commit_entry * atomic_grab_reverse_queue(); + + void group_commit_wait_for_wakeup(TC_group_commit_entry *entry); + void group_commit_wakeup_other(TC_group_commit_entry *other); + +private: + /* + This is a queue of threads waiting for being allowed to commit. + */ + void *group_commit_queue; + /* + This mutex protects the group_commit_queue on platforms without native + atomic operations. + */ + pthread_mutex_t LOCK_queue_noatomics; + +}; + +class TC_LOG_unordered: public TC_LOG_queued +{ +public: + TC_LOG_unordered(); + ~TC_LOG_unordered(); + + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered); + +protected: + virtual int log_xid(THD *thd, my_xid xid)=0; + +private: + /* + This flag, with associated mutex and condition, is used to reserve the + queue while threads in it each run the commit_ordered() methods one after + the other. Only once the last commit_ordered() in the queue is done can we + start on a new queue run. + + Since we start this process in the first thread in the queue and finish in + the last (and possibly different) thread, we need a condition variable for + this (we cannot unlock a mutex in a different thread than the one who + locked it). + */ + my_bool group_commit_queue_busy; + pthread_mutex_t LOCK_queue_busy; + pthread_cond_t COND_queue_busy; + + void group_commit_mark_queue_idle(); + void group_commit_mark_queue_busy(); + void group_commit_wait_queue_idle(); +}; + +class TC_LOG_group_commit: public TC_LOG_queued +{ +public: + TC_LOG_group_commit(); + ~TC_LOG_group_commit(); + + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered); + +protected: + /* + When using this class, this method is used instead of log_xid() to do + logging of a group of transactions all at once. The transactions will be linked through THD::next_commit_ordered. @@ -54,26 +177,35 @@ class TC_LOG commit order in InnoDB and binary log). For TCs that do not need this, it can be preferable to use plain log_xid() - instead, as it allows threads to run log_xid() in parallel with each - other. In contrast, group_log_xid() runs under a global mutex, so it is - guaranteed that only once call into it will be active at once. + with class TC_LOG_unordered instead, as it allows threads to run log_xid() + in parallel with each other. In contrast, group_log_xid() runs under a + global mutex, so it is guaranteed that only once call into it will be + active at once. Since this call handles multiple threads/THDs at once, my_error() (and other code that relies on thread local storage) cannot be used in this - method. Instead, in case of error, thd->xid_error should be set to the - error code, and xid_delayed_error() will be called later in the correct - thread context to actually report the error. + method. Instead, the implementation must record any error and report it as + the return value from xid_log_after(), which will be invoked individually + for each thread. In the success case, this method must set thd->xid_cookie for each thread to the cookie that is normally returned from log_xid() (which must be non-zero in the non-error case). */ - virtual void group_log_xid(THD *first_thd) { DBUG_ASSERT(FALSE); } - /* Error reporting for group_log_xid(). */ - virtual int xid_delayed_error(THD *thd) { DBUG_ASSERT(FALSE); return 0; } + virtual void group_log_xid(TC_group_commit_entry *first) = 0; + /* + Called for each transaction (in corrent thread context) after + group_log_xid() has finished, but with no guarantee on ordering among + threads. + Can be used to do error reporting etc. */ + virtual int xid_log_after(TC_group_commit_entry *entry) = 0; + +private: + /* Mutex used to serialise calls to group_log_xid(). */ + pthread_mutex_t LOCK_group_commit; }; -class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging +class TC_LOG_DUMMY: public TC_LOG_unordered // use it to disable the logging { public: TC_LOG_DUMMY() {} @@ -84,7 +216,7 @@ public: }; #ifdef HAVE_MMAP -class TC_LOG_MMAP: public TC_LOG +class TC_LOG_MMAP: public TC_LOG_unordered { public: // only to keep Sun Forte on sol9x86 happy typedef enum { @@ -264,7 +396,7 @@ private: }; class binlog_trx_data; -class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG +class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG { private: /* LOCK_log and LOCK_index are inited by init_pthread_objects() */ @@ -328,7 +460,7 @@ class MYSQL_BIN_LOG: public TC_LOG, priv int write_transaction(binlog_trx_data *trx_data); bool write_transaction_to_binlog_events(binlog_trx_data *trx_data); void trx_group_commit_participant(binlog_trx_data *trx_data); - void trx_group_commit_leader(THD *first_thd); + void trx_group_commit_leader(TC_group_commit_entry *first); binlog_trx_data *atomic_enqueue_trx(binlog_trx_data *trx_data); binlog_trx_data *atomic_grab_trx_queue(); void mark_xid_done(); @@ -361,9 +493,8 @@ public: int open(const char *opt_name); void close(); - int log_xid(THD *thd, my_xid xid); - int xid_delayed_error(THD *thd); - void group_log_xid(THD *first_thd); + void group_log_xid(TC_group_commit_entry *first); + int xid_log_after(TC_group_commit_entry *entry); void unlog(ulong cookie, my_xid xid); int recover(IO_CACHE *log, Format_description_log_event *fdle); #if !defined(MYSQL_CLIENT) Index: work-5.1-groupcommit/sql/sql_class.h =================================================================== --- work-5.1-groupcommit.orig/sql/sql_class.h 2010-08-24 09:11:00.000000000 +0200 +++ work-5.1-groupcommit/sql/sql_class.h 2010-08-24 09:11:00.000000000 +0200 @@ -1441,17 +1441,6 @@ public: /* Mutex and condition for waking up threads after group commit. */ pthread_mutex_t LOCK_commit_ordered; pthread_cond_t COND_commit_ordered; - bool group_commit_ready; - /* Pointer for linking THDs into queue waiting for group commit. */ - THD *next_commit_ordered; - /* - The "all" parameter of commit(), to communicate it to the thread that - calls commit_ordered(). - */ - bool group_commit_all; - /* Set by TC_LOG::group_log_xid(), to return per-thd error and cookie. */ - int xid_error; - int xid_cookie; #ifndef MYSQL_CLIENT int binlog_setup_trx_data(); Index: work-5.1-groupcommit/sql/handler.cc =================================================================== --- work-5.1-groupcommit.orig/sql/handler.cc 2010-08-24 09:11:00.000000000 +0200 +++ work-5.1-groupcommit/sql/handler.cc 2010-08-24 09:11:00.000000000 +0200 @@ -76,7 +76,8 @@ TYPELIB tx_isolation_typelib= {array_ele static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; uint known_extensions_id= 0; -static int commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered); +static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, + bool is_real_trans); static plugin_ref ha_default_plugin(THD *thd) @@ -545,26 +546,6 @@ err: DBUG_RETURN(1); } -/* - This is a queue of THDs waiting for being group committed with - tc_log->group_log_xid(). -*/ -static THD *group_commit_queue; -/* - This mutex protects the group_commit_queue on platforms without native - atomic operations. - */ -static pthread_mutex_t LOCK_group_commit_queue; -/* This mutex is used to serialize calls to handler prepare_ordered methods. */ -static pthread_mutex_t LOCK_prepare_ordered; -/* This mutex is used to serialize calls to handler commit_ordered methods. */ -static pthread_mutex_t LOCK_commit_ordered; -/* This mutex is used to serialize calls to group_log_xid(). */ -static pthread_mutex_t LOCK_group_commit; -static pthread_cond_t COND_group_commit; - -static bool mutexes_inited= FALSE; - int ha_init() { int error= 0; @@ -578,19 +559,6 @@ int ha_init() */ opt_using_transactions= total_ha>(ulong)opt_bin_log; savepoint_alloc_size+= sizeof(SAVEPOINT); - - group_commit_queue= NULL; - my_pthread_mutex_init(&LOCK_group_commit_queue, MY_MUTEX_INIT_FAST, - "LOCK_group_commit_queue", MYF(0)); - my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, - "LOCK_prepare_ordered", MYF(0)); - my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, - "LOCK_commit_ordered", MYF(0)); - my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, - "LOCK_group_commit", MYF(0)); - pthread_cond_init(&COND_group_commit, 0); - mutexes_inited= TRUE; - DBUG_RETURN(error); } @@ -608,15 +576,6 @@ int ha_end() if (ha_finish_errors()) error= 1; - if (mutexes_inited) - { - pthread_mutex_destroy(&LOCK_group_commit_queue); - pthread_mutex_destroy(&LOCK_prepare_ordered); - pthread_mutex_destroy(&LOCK_commit_ordered); - pthread_mutex_destroy(&LOCK_group_commit); - mutexes_inited= FALSE; - } - DBUG_RETURN(error); } @@ -1096,112 +1055,6 @@ ha_check_and_coalesce_trx_read_only(THD return rw_ha_count; } -/* - Atomically enqueue a THD at the head of the queue of threads waiting to - group commit, and return the previous head of the queue. -*/ -static THD * -enqueue_atomic(THD *thd) -{ - my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); - THD *orig_queue= group_commit_queue; - do - { - thd->next_commit_ordered= orig_queue; - } - while (!my_atomic_casptr((void **)(&group_commit_queue), - (void **)(&orig_queue), - thd)); - my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); - - return orig_queue; -} - -static THD * -atomic_grab_reverse_queue() -{ - my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); - THD *queue= group_commit_queue; - while (!my_atomic_casptr((void **)(&group_commit_queue), - (void **)(&queue), - NULL)) - ; - my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); - - /* - Since we enqueue at the head, the queue is actually in reverse order. - So reverse it back into correct commit order before returning. - */ - THD *prev= NULL; - while (queue) - { - THD *next= queue->next_commit_ordered; - queue->next_commit_ordered= prev; - prev= queue; - queue= next; - } - - return prev; -} - -static void -call_commit_ordered(Ha_trx_info *ha_info, THD *thd, bool all) -{ - for (; ha_info; ha_info= ha_info->next()) - { - handlerton *ht= ha_info->ht(); - if (!ht->commit_ordered) - continue; - ht->commit_ordered(ht, thd, all); - } -} - -static void -group_commit_wait_for_wakeup(THD *thd) -{ - pthread_mutex_lock(&thd->LOCK_commit_ordered); - while (!thd->group_commit_ready) - pthread_cond_wait(&thd->COND_commit_ordered, - &thd->LOCK_commit_ordered); - pthread_mutex_unlock(&thd->LOCK_commit_ordered); -} - -static void -group_commit_wakeup_other(THD *other_thd) -{ - pthread_mutex_lock(&other_thd->LOCK_commit_ordered); - other_thd->group_commit_ready= TRUE; - pthread_cond_signal(&other_thd->COND_commit_ordered); - pthread_mutex_unlock(&other_thd->LOCK_commit_ordered); -} - -static bool group_commit_queue_busy= 0; - -static void -group_commit_mark_queue_idle() -{ - pthread_mutex_lock(&LOCK_group_commit); - group_commit_queue_busy= FALSE; - pthread_cond_signal(&COND_group_commit); - pthread_mutex_unlock(&LOCK_group_commit); -} - -static void -group_commit_mark_queue_busy() -{ - safe_mutex_assert_owner(&LOCK_group_commit); - group_commit_queue_busy= TRUE; -} - -static void -group_commit_wait_queue_idle() -{ - /* Wait for any existing queue run to finish. */ - safe_mutex_assert_owner(&LOCK_group_commit); - while (group_commit_queue_busy) - pthread_cond_wait(&COND_group_commit, &LOCK_group_commit); -} - /** @retval @@ -1219,7 +1072,7 @@ group_commit_wait_queue_idle() */ int ha_commit_trans(THD *thd, bool all) { - int error= 0; + int error= 0, cookie; /* 'all' means that this is either an explicit commit issued by user, or an implicit commit issued by a DDL. @@ -1235,8 +1088,6 @@ int ha_commit_trans(THD *thd, bool all) bool is_real_trans= all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list; bool need_prepare_ordered, need_commit_ordered; - bool need_enqueue; - bool is_group_commit_leader; my_xid xid; DBUG_ENTER("ha_commit_trans"); @@ -1346,199 +1197,22 @@ int ha_commit_trans(THD *thd, bool all) if (!is_real_trans) { - error= commit_one_phase_2(thd, all, FALSE); + error= commit_one_phase_2(thd, all, trans, is_real_trans); DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); goto end; } - /* - We can optimise away some of the thread synchronisation that may not be - needed. - - If need_prepare_ordered, then we need to take LOCK_prepare_ordered. - - If (xid && use_group_log_xid), then we need to enqueue (and this must - be done under LOCK_prepare_ordered if we take that lock). - - Similarly, if (need_prepare_ordered && need_commit_ordered), then we - need to enqueue under the LOCK_prepare_ordered. - - If (xid && use_group_log_xid), then we need to take LOCK_group_commit. - - If need_commit_ordered, then we need to take LOCK_commit_ordered. - - Cases not covered by above can be skipped to optimise things a bit. - */ - need_enqueue= (xid && tc_log->use_group_log_xid) || - (need_prepare_ordered && need_commit_ordered); - - thd->group_commit_ready= FALSE; - thd->group_commit_all= all; - if (need_prepare_ordered) - { - pthread_mutex_lock(&LOCK_prepare_ordered); - - for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) - { - int err; - handlerton *ht= hi->ht(); - if (! hi->is_trx_read_write()) - continue; - if (ht->prepare_ordered && (err= ht->prepare_ordered(ht, thd, all))) - { - my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); - pthread_mutex_unlock(&LOCK_prepare_ordered); - goto err; - } - } - } - if (need_enqueue) - { - THD *previous_queue= enqueue_atomic(thd); - is_group_commit_leader= (previous_queue == NULL); - } - if (need_prepare_ordered) - pthread_mutex_unlock(&LOCK_prepare_ordered); - - int cookie; - if (tc_log->use_group_log_xid) - { - // ToDo: if xid==NULL here, we may use is_group_commit_leader uninitialised. - // ToDo: Same for cookie below when xid==NULL. - // Seems we generally need to check the case xid==NULL. - if (is_group_commit_leader) - { - pthread_mutex_lock(&LOCK_group_commit); - group_commit_wait_queue_idle(); - - THD *queue= atomic_grab_reverse_queue(); - /* The first in the queue is the leader. */ - DBUG_ASSERT(queue == thd); - - /* - This will set individual error codes in each thd->xid_error, as - well as set thd->xid_cookie for later unlog() call. - */ - tc_log->group_log_xid(queue); - - /* - Call commit_ordered methods for all transactions in the queue - (that did not get an error in group_log_xid()). - - We do this under an additional global LOCK_commit_ordered; this is - so that transactions that do not need 2-phase commit do not have - to wait for the potentially long duration of LOCK_group_commit. - */ - if (need_commit_ordered) - { - pthread_mutex_lock(&LOCK_commit_ordered); - for (THD *thd2= queue; thd2 != NULL; thd2= thd2->next_commit_ordered) - { - if (!thd2->xid_error) - call_commit_ordered(ha_info, thd2, thd2->group_commit_all); - } - pthread_mutex_unlock(&LOCK_commit_ordered); - } - pthread_mutex_unlock(&LOCK_group_commit); - - /* Wake up everyone except ourself. */ - THD *current= queue->next_commit_ordered; - while (current != NULL) - { - /* - Careful not to access current->next_commit_ordered after waking up - the other thread! As it may change immediately after wakeup. - */ - THD *next= current->next_commit_ordered; - group_commit_wakeup_other(current); - current= next; - } - } - else - { - /* If not leader, just wait until leader wakes us up. */ - group_commit_wait_for_wakeup(thd); - } - - /* - Now that we're back in our own thread context, do any delayed error - reporting. - */ - if (thd->xid_error) - { - tc_log->xid_delayed_error(thd); - goto err; - } - cookie= thd->xid_cookie; - /* The cookie must be non-zero in the non-error case. */ - DBUG_ASSERT(cookie); - } - else - { - if (xid) - cookie= tc_log->log_xid(thd, xid); - - if (!need_enqueue) - { - error= commit_one_phase_2(thd, all, TRUE); - DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); - goto end; - } - - /* - We only get here to do correctly sequenced prepare_ordered and - commit_ordered() calls. - - In this case, we need to wait for the previous in queue to finish - commit_ordered before us to get the correct sequence. - */ - DBUG_ASSERT(need_prepare_ordered && need_commit_ordered); - - if (is_group_commit_leader) - { - pthread_mutex_lock(&LOCK_group_commit); - group_commit_wait_queue_idle(); - THD *queue= atomic_grab_reverse_queue(); - /* - Mark the queue busy while we bounce it from one thread to the - next. - */ - group_commit_mark_queue_busy(); - pthread_mutex_unlock(&LOCK_group_commit); - - /* The first in the queue is the leader. */ - DBUG_ASSERT(queue == thd); - } - else - { - /* If not leader, just wait until previous thread wakes us up. */ - group_commit_wait_for_wakeup(thd); - } - - /* Only run commit_ordered() if log_xid was successful. */ - if (cookie) - { - pthread_mutex_lock(&LOCK_commit_ordered); - call_commit_ordered(ha_info, thd, all); - pthread_mutex_unlock(&LOCK_commit_ordered); - } - - THD *next= thd->next_commit_ordered; - if (next) - group_commit_wakeup_other(next); - else - group_commit_mark_queue_idle(); - - if (!cookie) - goto err; - } + cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered, + need_commit_ordered); + if (!cookie) + goto err; DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT();); - error= commit_one_phase_2(thd, all, FALSE) ? 2 : 0; + error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT();); - DBUG_ASSERT(cookie); tc_log->unlog(cookie, xid); DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); @@ -1563,18 +1237,6 @@ end: */ int ha_commit_one_phase(THD *thd, bool all) { - /* - When we come here, we did not call handler commit_ordered() methods in - ha_commit_trans() 2-phase commit, so we pass TRUE to do it in - commit_one_phase_2(). - */ - return commit_one_phase_2(thd, all, TRUE); -} - -static int -commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered) -{ - int error=0; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; /* "real" is a nick name for a transaction for which a commit will @@ -1584,27 +1246,17 @@ commit_one_phase_2(THD *thd, bool all, b enclosing 'all' transaction is rolled back. */ bool is_real_trans=all || thd->transaction.all.ha_list == 0; - Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; - DBUG_ENTER("commit_one_phase_2"); + Ha_trx_info *ha_info= trans->ha_list; + DBUG_ENTER("ha_commit_one_phase"); #ifdef USING_TRANSACTIONS if (ha_info) { - if (is_real_trans && do_commit_ordered) + if (is_real_trans) { - /* - If we did not do it already, call any commit_ordered() method. - - Even though we do not need to keep any ordering with other threads - (as there is no prepare or log_xid for this commit), we still need to - do this under the LOCK_commit_ordered mutex to not run in parallel - with other commit_ordered calls. - */ - - bool locked= FALSE; - - for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) + bool locked= false; + for (; ha_info; ha_info= ha_info->next()) { - handlerton *ht= hi->ht(); + handlerton *ht= ha_info->ht(); if (ht->commit_ordered) { if (!locked) @@ -1618,7 +1270,20 @@ commit_one_phase_2(THD *thd, bool all, b if (locked) pthread_mutex_unlock(&LOCK_commit_ordered); } + } +#endif /* USING_TRANSACTIONS */ + DBUG_RETURN(commit_one_phase_2(thd, all, trans, is_real_trans)); +} +static int +commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) +{ + int error= 0; + Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; + DBUG_ENTER("commit_one_phase_2"); +#ifdef USING_TRANSACTIONS + if (ha_info) + { for (; ha_info; ha_info= ha_info_next) { int err; Index: work-5.1-groupcommit/sql/handler.h =================================================================== --- work-5.1-groupcommit.orig/sql/handler.h 2010-08-24 09:11:00.000000000 +0200 +++ work-5.1-groupcommit/sql/handler.h 2010-08-24 09:11:00.000000000 +0200 @@ -660,7 +660,9 @@ struct handlerton /* The commit_ordered() method is called prior to the commit() method, after the transaction manager has decided to commit (not rollback) the - transaction. + transaction. Unlike commit(), commit_ordered() is called only when the + full transaction is committed, not for each commit of statement + transaction (so there is no ALL parameter). The calls to commit_ordered() in multiple parallel transactions is guaranteed to happen in the same order in every participating @@ -682,16 +684,12 @@ struct handlerton Note that commit_ordered() can be called from a different thread than the one handling the transaction! So it can not do anything that depends on thread local storage, in particular it can not call my_error() and - friends (instead it can store the error code and delay the call to + friends (instead it can store the error code and delay the call of my_error() to the commit() method). Similarly, since commit_ordered() returns void, any return error code must be saved and returned from the commit() method instead. - commit_ordered() is called only when actually committing a transaction - (autocommit or not), not when ending a statement in the middle of a - transaction. - The commit_ordered method is optional, and can be left unset if not needed in a particular handler. */ @@ -700,7 +698,9 @@ struct handlerton int (*prepare)(handlerton *hton, THD *thd, bool all); /* The prepare_ordered method is optional. If set, it will be called after - successful prepare() in all handlers participating in 2-phase commit. + successful prepare() in all handlers participating in 2-phase + commit. Like commit_ordered(), it is called only when the full + transaction is committed, not for each commit of statement transaction. The calls to prepare_ordered() among multiple parallel transactions are ordered consistently with calls to commit_ordered(). This means that @@ -710,7 +710,7 @@ struct handlerton Thus, prepare_ordered() can be used to define commit order for handlers that need to do this in the prepare step (like binlog). It can also be - used to release transactions locks early in an order consistent with the + used to release transaction's locks early in an order consistent with the order transactions will be eventually committed. Like commit_ordered(), prepare_ordered() calls are serialised to maintain @@ -719,18 +719,27 @@ struct handlerton rely on this serialisation, and do not need to do any extra locking to avoid two prepare_ordered() calls running in parallel. - Unlike commit_ordered(), prepare_ordered() _is_ guaranteed to be called - in the context of the thread handling the rest of the transaction. + Like commit_ordered(), prepare_ordered() is not guaranteed to be called + in the context of the thread handling the rest of the transaction. So it + cannot invoke code that relies on thread local storage, in particular it + cannot call my_error(). + + When prepare_ordered() is called, the transaction coordinator has already + decided to commit (not rollback) the transaction. So prepare_ordered() + cannot cause a rollback by returning an error, all possible errors must + be handled in prepare() (the prepare_ordered() method returns void). In + case of some fatal error, a record of the error must be made internally + by the engine and returned from commit() later. Note that for user-level XA SQL commands, no consistent ordering among prepare_ordered() and commit_ordered() is guaranteed (as that would require blocking all other commits for an indefinite time). - prepare_ordered() is called only when actually committing a transaction - (autocommit or not), not when ending a statement in the middle of a - transaction. + When 2-phase commit is not used (eg. only one engine (and no binlog) in + transaction), prepare() is not called and in such cases prepare_ordered() + also is not called. */ - int (*prepare_ordered)(handlerton *hton, THD *thd, bool all); + void (*prepare_ordered)(handlerton *hton, THD *thd, bool all); int (*recover)(handlerton *hton, XID *xid_list, uint len); int (*commit_by_xid)(handlerton *hton, XID *xid); int (*rollback_by_xid)(handlerton *hton, XID *xid); Index: work-5.1-groupcommit/sql/mysqld.cc =================================================================== --- work-5.1-groupcommit.orig/sql/mysqld.cc 2010-08-24 09:04:50.000000000 +0200 +++ work-5.1-groupcommit/sql/mysqld.cc 2010-08-24 09:11:00.000000000 +0200 @@ -1333,6 +1333,7 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); + TC_destroy(); xid_cache_free(); wt_end(); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); @@ -4124,6 +4125,8 @@ a file name for --log-bin-index option", if (!errmesg[0][0]) unireg_abort(1); + TC_init(); + /* We have to initialize the storage engines before CSV logging */ if (ha_init()) {