Another temporary patch for debugging a cyclic queue race. This one uses a cyclic memory buffer for logging, as the issue could not be reproduced with printf debugging. This might be useful for other debugging sessions. The issue turned out to be caused by problems in my_atomic_casptr(val,cmp,new). It was accessing *cmp after CAS successfully assigns *val, which breaks the contract and causes problems for the group commit algorithm. --- include/atomic/gcc_builtins.h | 5 +- include/atomic/x86-gcc.h | 23 +++++++--- sql/handler.cc | 95 ++++++++++++++++++++++++++++++++++++++++-- sql/log.cc | 1 4 files changed, 114 insertions(+), 10 deletions(-) Index: work-5.1-groupcommit/include/atomic/gcc_builtins.h =================================================================== --- work-5.1-groupcommit.orig/include/atomic/gcc_builtins.h 2010-06-09 00:23:08.000000000 +0200 +++ work-5.1-groupcommit/include/atomic/gcc_builtins.h 2010-06-09 10:36:36.000000000 +0200 @@ -19,8 +19,9 @@ v= __sync_lock_test_and_set(a, v); #define make_atomic_cas_body(S) \ int ## S sav; \ - sav= __sync_val_compare_and_swap(a, *cmp, set); \ - if (!(ret= (sav == *cmp))) *cmp= sav; + int ## S cmp_val= *cmp; \ + sav= __sync_val_compare_and_swap(a, cmp_val, set);\ + if (!(ret= (sav == cmp_val))) *cmp= sav #ifdef MY_ATOMIC_MODE_DUMMY #define make_atomic_load_body(S) ret= *a Index: work-5.1-groupcommit/include/atomic/x86-gcc.h =================================================================== --- work-5.1-groupcommit.orig/include/atomic/x86-gcc.h 2010-06-09 10:22:19.000000000 +0200 +++ work-5.1-groupcommit/include/atomic/x86-gcc.h 2010-06-09 10:41:03.000000000 +0200 @@ -38,15 +38,28 @@ #define asm __asm__ #endif +/* + Note that the atomic operations have "memory" in the clobber list. + + The reason is that they include memory barriers which makes local writes + visible to other threads, and vice versa. Thus as soon as the instruction + has run, other threads may step in and change data that is aliased to some + pointer accessible in the other thread. The "memory" clobber handles this in + a clean way. +*/ #ifndef MY_ATOMIC_NO_XADD #define make_atomic_add_body(S) \ - asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a)) + asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a): : "memory") #endif #define make_atomic_fas_body(S) \ - asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a)) + asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a) : : "memory") #define make_atomic_cas_body(S) \ + int ## S sav; \ asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \ - : "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set)) + : "+m" (*a), "=a" (sav), "=q" (ret) \ + : "r" (set), "a" (*cmp) : "memory"); \ + if (!ret) \ + *cmp= sav #ifdef MY_ATOMIC_MODE_DUMMY #define make_atomic_load_body(S) ret=*a @@ -59,9 +72,9 @@ #define make_atomic_load_body(S) \ ret=0; \ asm volatile (LOCK_prefix "; cmpxchg %2, %0" \ - : "+m" (*a), "+a" (ret): "r" (ret)) + : "+m" (*a), "+a" (ret) : "r" (ret) : "memory") #define make_atomic_store_body(S) \ - asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v)) + asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v) : : "memory") #endif /* TODO test on intel whether the below helps. on AMD it makes no difference */ Index: work-5.1-groupcommit/sql/handler.cc =================================================================== --- work-5.1-groupcommit.orig/sql/handler.cc 2010-06-04 13:53:01.000000000 +0200 +++ work-5.1-groupcommit/sql/handler.cc 2010-06-09 00:41:32.000000000 +0200 @@ -1096,6 +1096,73 @@ ha_check_and_coalesce_trx_read_only(THD return rw_ha_count; } +static struct { + uint32_t type; + uint32_t sub; + uint64_t my_thd; + uint64_t other_thd; +} cyclic_buffer[2048]; +static volatile int32 cyclic_index= 0; + +/* + Some logging that (hopefully) doesn't prevent repro while still giving info + to debug. + + Types: + + 1 check_queue, my_thd is checker. + 2 grab queue start, my_thd is grabber + 3 grab queue end, my_thd is grabber, other_thd is grabbed queue, sub is queue len + 4 enqueue start, my_thd is thd enqueued + 5 enqueue end, my_thd is thd enqueued, other_thd is orig_queue, sub is cas count +*/ +static inline void +cyclic_buffer_enqueue(uint32_t type, uint32_t sub, THD *my_thd, THD *other_thd) +{ + uint32_t my_index= my_atomic_add32(&cyclic_index, 1); + uint32_t i= my_index % (sizeof(cyclic_buffer)/sizeof(cyclic_buffer[0])); + cyclic_buffer[i].type= type; + cyclic_buffer[i].sub= sub; + cyclic_buffer[i].my_thd= (uint64_t)my_thd; + cyclic_buffer[i].other_thd= (uint64_t)other_thd; + /* Clear old entries, with margin to avoid races confusing things. */ + i= (i + (sizeof(cyclic_buffer)/sizeof(cyclic_buffer[0]))*4/5) + % (sizeof(cyclic_buffer)/sizeof(cyclic_buffer[0])); + cyclic_buffer[i].type= 0; + cyclic_buffer[i].sub= 0; + cyclic_buffer[i].my_thd= 0; + cyclic_buffer[i].other_thd= 0; +} + +/* + Check for cyclic list in queue, to debug the infinite loop seens in stress + tests. +*/ +static void +check_queue(const THD *me, const THD *first) +{ + const THD *p= first; + const THD *q= p; + + if (!first) + abort(); + + //fprintf(stderr, "CHECK(%p):", me); + while (p) + { + //fprintf(stderr, " %p", p); + p= p->next_commit_ordered; + if (!p) + break; + //fprintf(stderr, " %p", p); + p= p->next_commit_ordered; + q= q->next_commit_ordered; + if (p == q) + abort(); + } + //fprintf(stderr, "\n"); +} + /* Atomically enqueue a THD at the head of the queue of threads waiting to group commit, and return the previous head of the queue. @@ -1104,12 +1171,15 @@ static THD * enqueue_atomic(THD *thd) { THD *orig_queue; + uint32_t count= 0; thd->next_commit_ordered= group_commit_queue; + cyclic_buffer_enqueue(4, 1, thd, 0); my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); do { + count++; /* Save the read value of group_commit_queue in each iteration of the loop. When my_atomic_casptr() returns TRUE, we know that orig_queue is equal @@ -1126,12 +1196,14 @@ enqueue_atomic(THD *thd) thd)); my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + cyclic_buffer_enqueue(5, count, thd, orig_queue); return orig_queue; } static THD * -atomic_grab_reverse_queue() +atomic_grab_reverse_queue(THD *me) { + cyclic_buffer_enqueue(2, 0, me, 0); my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); THD *queue= group_commit_queue; while (!my_atomic_casptr((void **)(&group_commit_queue), @@ -1140,19 +1212,28 @@ atomic_grab_reverse_queue() ; my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + check_queue(0, queue); + + THD *queue_end= queue; /* Since we enqueue at the head, the queue is actually in reverse order. So reverse it back into correct commit order before returning. */ THD *prev= NULL; + uint32_t len= 0; while (queue) { THD *next= queue->next_commit_ordered; queue->next_commit_ordered= prev; prev= queue; queue= next; + len++; } + cyclic_buffer_enqueue(3, len, me, queue_end); + + check_queue(0, prev); + return prev; } @@ -1423,14 +1504,21 @@ int ha_commit_trans(THD *thd, bool all) pthread_mutex_lock(&LOCK_group_commit); group_commit_wait_queue_idle(); - THD *queue= atomic_grab_reverse_queue(); + THD *queue= atomic_grab_reverse_queue(thd); /* The first in the queue is the leader. */ DBUG_ASSERT(queue == thd); + if (queue != thd) + abort(); /* This will set individual error codes in each thd->xid_error, as well as set thd->xid_cookie for later unlog() call. */ + cyclic_buffer_enqueue(1, 1, thd, 0); + check_queue(thd, queue); + my_sleep(1500); + cyclic_buffer_enqueue(1, 2, thd, 0); + check_queue(thd, queue); tc_log->group_log_xid(queue); /* @@ -1510,7 +1598,7 @@ int ha_commit_trans(THD *thd, bool all) { pthread_mutex_lock(&LOCK_group_commit); group_commit_wait_queue_idle(); - THD *queue= atomic_grab_reverse_queue(); + THD *queue= atomic_grab_reverse_queue(thd); /* Mark the queue busy while we bounce it from one thread to the next. @@ -1535,6 +1623,7 @@ int ha_commit_trans(THD *thd, bool all) pthread_mutex_unlock(&LOCK_commit_ordered); } + abort(/* Just to check that this is dead code in the repro */); THD *next= thd->next_commit_ordered; if (next) group_commit_wakeup_other(next); Index: work-5.1-groupcommit/sql/log.cc =================================================================== --- work-5.1-groupcommit.orig/sql/log.cc 2010-06-08 11:31:15.000000000 +0200 +++ work-5.1-groupcommit/sql/log.cc 2010-06-08 11:34:20.000000000 +0200 @@ -6227,6 +6227,7 @@ int TC_LOG_BINLOG::log_xid(THD *thd, my_ int error; DBUG_ENTER("TC_LOG_BINLOG::log_xid"); + abort(/* Just to check that this is dead code in the repro */); thd->next_commit_ordered= 0; group_log_xid(thd); if (thd->xid_error)