Some printouts and checks used while debugging some races. These were used to find the two races with accessing thd->next_commit_ordered on wrong side of inter-thread communication. --- BUILD/SETUP.sh | 2 - sql/handler.cc | 110 +++++++++++++++++++++++++++++++++++++++++++++++++------ sql/sql_class.cc | 1 3 files changed, 101 insertions(+), 12 deletions(-) Index: work-5.1-groupcommit/BUILD/SETUP.sh =================================================================== --- work-5.1-groupcommit.orig/BUILD/SETUP.sh 2010-06-03 16:06:27.000000000 +0200 +++ work-5.1-groupcommit/BUILD/SETUP.sh 2010-06-03 16:06:42.000000000 +0200 @@ -140,7 +140,7 @@ base_cxxflags="-felide-constructors -fno # # Flags for optimizing builds. # Be as fast as we can be without losing our ability to backtrace. -fast_cflags="-O3 -fno-omit-frame-pointer" +fast_cflags="-O3 -g -fno-omit-frame-pointer" debug_configs="--with-debug$full_debug" if [ -z "$full_debug" ] Index: work-5.1-groupcommit/sql/handler.cc =================================================================== --- work-5.1-groupcommit.orig/sql/handler.cc 2010-06-03 16:06:34.000000000 +0200 +++ work-5.1-groupcommit/sql/handler.cc 2010-06-04 11:38:50.000000000 +0200 @@ -562,6 +562,7 @@ static pthread_mutex_t LOCK_commit_order /* This mutex is used to serialize calls to group_log_xid(). */ static pthread_mutex_t LOCK_group_commit; static pthread_cond_t COND_group_commit; +static pthread_mutex_t LOCK_xxx; static bool mutexes_inited= FALSE; @@ -588,6 +589,8 @@ int ha_init() "LOCK_commit_ordered", MYF(0)); my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, "LOCK_group_commit", MYF(0)); + my_pthread_mutex_init(&LOCK_xxx, MY_MUTEX_INIT_FAST, + "LOCK_xxx", MYF(0)); pthread_cond_init(&COND_group_commit, 0); mutexes_inited= TRUE; @@ -1097,45 +1100,104 @@ ha_check_and_coalesce_trx_read_only(THD } /* + Check for cyclic list in queue, to debug the infinite loop seens in stress + tests. +*/ +static void +check_queue(const THD *me, const THD *first) +{ + const THD *p= first; + const THD *q= p; + + if (!first) + abort(); + + fprintf(stderr, "CHECK(%p):", me); + while (p) + { + fprintf(stderr, " %p", p); + p= p->next_commit_ordered; + if (!p) + break; + fprintf(stderr, " %p", p); + p= p->next_commit_ordered; + q= q->next_commit_ordered; + if (p == q) + abort(); + } + fprintf(stderr, "\n"); +} + +/* Atomically enqueue a THD at the head of the queue of threads waiting to group commit, and return the previous head of the queue. */ static THD * enqueue_atomic(THD *thd) { + THD *orig_queue; + + pthread_mutex_lock(&LOCK_xxx); my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + thd->next_commit_ordered= group_commit_queue; + do + { + /* + Save the read value of group_commit_queue in each iteration of the loop. + When my_atomic_casptr() returns TRUE, we know that orig_queue is equal + to the value of group_commit_queue when we enqueued. + + However, as soon as we enqueue, thd->next_commit_ordered may be + invalidated by another thread (the group commit leader). So we need to + save the old queue value in a local variable orig_queue like this. + */ + orig_queue= thd->next_commit_ordered; + } while (!my_atomic_casptr((void **)(&group_commit_queue), (void **)(&thd->next_commit_ordered), - thd)) - ; + thd)); my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); - return thd->next_commit_ordered; + check_queue(thd, thd); + pthread_mutex_unlock(&LOCK_xxx); + + return orig_queue; } static THD * -atomic_grab_reverse_queue() +atomic_grab_reverse_queue(THD *me) { + pthread_mutex_lock(&LOCK_xxx); my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); THD *queue= group_commit_queue; + if (!queue) + abort(); + while (!my_atomic_casptr((void **)(&group_commit_queue), (void **)(&queue), NULL)) ; my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + pthread_mutex_unlock(&LOCK_xxx); + + check_queue(me, queue); /* Since we enqueue at the head, the queue is actually in reverse order. So reverse it back into correct commit order before returning. */ THD *prev= NULL; + fprintf(stderr, "QUEUE:"); while (queue) { + fprintf(stderr, " %p", queue); THD *next= queue->next_commit_ordered; queue->next_commit_ordered= prev; prev= queue; queue= next; } + fprintf(stderr, "\n"); + return prev; } @@ -1155,20 +1217,26 @@ call_commit_ordered(Ha_trx_info *ha_info static void group_commit_wait_for_wakeup(THD *thd) { + fprintf(stderr, "#WAKEUP wait(%p)\n", thd); pthread_mutex_lock(&thd->LOCK_commit_ordered); while (!thd->group_commit_ready) pthread_cond_wait(&thd->COND_commit_ordered, &thd->LOCK_commit_ordered); pthread_mutex_unlock(&thd->LOCK_commit_ordered); + fprintf(stderr, "#WAKEUP run(%p)\n", thd); } static void -group_commit_wakeup_other(THD *other_thd) +group_commit_wakeup_other(const THD *me, THD *other_thd) { + fprintf(stderr, "*PROD(me=%p) wait(%p)\n", me, other_thd); pthread_mutex_lock(&other_thd->LOCK_commit_ordered); + if (other_thd->group_commit_ready) + abort(); other_thd->group_commit_ready= TRUE; pthread_cond_signal(&other_thd->COND_commit_ordered); pthread_mutex_unlock(&other_thd->LOCK_commit_ordered); + fprintf(stderr, "*PROD(me=%p) do(%p)\n", me, other_thd); } static bool group_commit_queue_busy= 0; @@ -1368,6 +1436,8 @@ int ha_commit_trans(THD *thd, bool all) need_enqueue= (xid && tc_log->use_group_log_xid) || (need_prepare_ordered && need_commit_ordered); + if (!thd->group_commit_ready) + abort(); thd->group_commit_ready= FALSE; thd->group_commit_all= all; if (need_prepare_ordered) @@ -1399,12 +1469,17 @@ int ha_commit_trans(THD *thd, bool all) int cookie; if (tc_log->use_group_log_xid) { + // ToDo: if xid==NULL here, we may use is_group_commit_leader uninitialised. + // ToDo: Same for cookie below when xid==NULL. + // Seems we generally need to check the case xid==NULL. if (is_group_commit_leader) { + fprintf(stderr, "*LEAD(%p) wait\n", thd); pthread_mutex_lock(&LOCK_group_commit); group_commit_wait_queue_idle(); + fprintf(stderr, "*LEAD(%p) run\n", thd); - THD *queue= atomic_grab_reverse_queue(); + THD *queue= atomic_grab_reverse_queue(thd); /* The first in the queue is the leader. */ DBUG_ASSERT(queue == thd); @@ -1434,14 +1509,26 @@ int ha_commit_trans(THD *thd, bool all) } pthread_mutex_unlock(&LOCK_group_commit); - /* Wake up everyone except ourself. */ - while ((queue= queue->next_commit_ordered) != NULL) - group_commit_wakeup_other(queue); + thd->group_commit_ready= TRUE; + /* Wake up everyone except ourself. */ + THD *current= queue->next_commit_ordered; + while (current != NULL) + { + /* + Careful not to access current->next_commit_ordered after waking up + the other thread! As it may change immediately after wakeup. + */ + THD *next= current->next_commit_ordered; + group_commit_wakeup_other(thd, current); + current= next; + } } else { + fprintf(stderr, "#OTHER(%p) wait\n", thd); /* If not leader, just wait until leader wakes us up. */ group_commit_wait_for_wakeup(thd); + fprintf(stderr, "#OTHER(%p) run\n", thd); } /* @@ -1459,6 +1546,7 @@ int ha_commit_trans(THD *thd, bool all) } else { + abort(); if (xid) cookie= tc_log->log_xid(thd, xid); @@ -1482,7 +1570,7 @@ int ha_commit_trans(THD *thd, bool all) { pthread_mutex_lock(&LOCK_group_commit); group_commit_wait_queue_idle(); - THD *queue= atomic_grab_reverse_queue(); + THD *queue= atomic_grab_reverse_queue(thd); /* Mark the queue busy while we bounce it from one thread to the next. @@ -1509,7 +1597,7 @@ int ha_commit_trans(THD *thd, bool all) THD *next= thd->next_commit_ordered; if (next) - group_commit_wakeup_other(next); + group_commit_wakeup_other(thd, next); else group_commit_mark_queue_idle(); Index: work-5.1-groupcommit/sql/sql_class.cc =================================================================== --- work-5.1-groupcommit.orig/sql/sql_class.cc 2010-06-04 08:49:07.000000000 +0200 +++ work-5.1-groupcommit/sql/sql_class.cc 2010-06-04 08:49:24.000000000 +0200 @@ -576,6 +576,7 @@ THD::THD() lock_id(&main_lock_id), user_time(0), in_sub_stmt(0), sql_log_bin_toplevel(false), + group_commit_ready(TRUE), binlog_table_maps(0), binlog_flags(0UL), table_map_for_update(0), arg_of_last_insert_id_function(FALSE),