c-与setjmp / longjmp一起使用fork
作者:互联网
我正在尝试使用fork与setjmp / longjmp结合使用基于多线程fork的检查点方案.我希望我的解决方案能奏效,但没有预期.下面显示了该代码以及检查点/回滚的用法示例.
主要思想是自己为线程分配堆栈,方法是使用函数pthread_create_with_stack,然后仅使用主线程中的派生.分叉的进程(检查点)在开始时被挂起,并且在唤醒(回滚)时,分叉的主线程通过调用pthread_create来重新创建线程,并使用与原始进程中的线程相同的堆栈.另外,longjmp在线程例程的开头就开始执行,以便在将进程作为检查点进行分叉时跳转到代码中的同一点.请注意,所有setjmp调用都在函数my_pthread_barrier_wait中完成,因此没有线程获得锁.
我认为这里的问题是setjmp / lonjmp. getcontext / savecontext / makecontext会在这里有帮助吗?甚至可以在这里使用setjmp / longjmp这样的方式工作吗?任何解决方案将不胜感激.
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <setjmp.h>
#define PERFORM_JMP
#define NUM_THREADS 4
void *stackAddr[NUM_THREADS];
pthread_t thread[NUM_THREADS];
jmp_buf buf[NUM_THREADS];
pthread_attr_t attr[NUM_THREADS];
pthread_barrier_t bar;
sem_t sem;
pid_t cp_pid;
int rollbacked;
int iter;
long thread_id[NUM_THREADS];
void *BusyWork(void *t);
void sig_handler(int signum)
{
printf( "signal_handler posting sem!\n" );
sem_post( &sem );
}
int pthread_create_with_stack( void *(*start_routine) (void *), int tid )
{
const size_t STACKSIZE = 0xC00000; //12582912
size_t i;
pid_t pid;
int rc;
printf( "tid = %d\n", tid );
pthread_attr_init( &attr[tid] );
stackAddr[tid] = malloc(STACKSIZE);
pthread_attr_setstack( &attr[tid], stackAddr[tid], STACKSIZE );
thread_id[tid] = tid;
rc = pthread_create( &thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid] );
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
return rc;
}
pid_t checkpoint()
{
pid_t pid;
int t, rc;
switch (pid=fork())
{
case -1:
perror("fork");
break;
case 0: // child process starts
sem_wait( &sem );
rollbacked = 1;
printf( "case 0: rollbacked = 1, my pid is %d\n", getpid() );
for( t = 1; t < NUM_THREADS; t++ )
{
printf( "checkpoint: creating thread %d again\n", t );
rc = pthread_create( &thread[t], &attr[t], BusyWork, (void*)&thread_id[t] );
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
return 1; // child process ends
default: // parent process starts
return pid;
}
}
void restart_from_checkpoint( pid_t pid )
{
printf( "Restart_from_checkpoint, sending signal to %d!\n", pid );
kill( pid, SIGUSR1 );
exit( 0 );
}
void take_checkpoint_or_rollback( int sig_diff )
{
if ( cp_pid )
{
if ( sig_diff )
{
printf( "rollbacking\n" );
if ( !rollbacked )
restart_from_checkpoint( cp_pid );
}
else
{
kill( cp_pid, SIGKILL );
cp_pid = checkpoint();
printf( "%d: cp_pid = %d!\n", getpid(), cp_pid );
}
}
else
cp_pid = checkpoint();
}
void my_pthread_barrier_wait( int tid, pthread_barrier_t *pbar )
{
pthread_barrier_wait( pbar );
#ifdef PERFORM_JMP
if ( tid == 0 )
{
if ( !rollbacked )
{
take_checkpoint_or_rollback( ++iter == 4 );
}
}
if ( setjmp( buf[tid] ) != 0 ) {}
else {}
printf( "%d: %d is waiting at the second barrier!\n", getpid(), tid );
#endif
pthread_barrier_wait( pbar );
}
void *BusyWork(void *t)
{
volatile int i;
volatile long tid = *((long*)t);
volatile double result = 0.0;
printf( "thread %ld in BusyWork!\n", tid );
#ifdef PERFORM_JMP
if ( rollbacked )
{
printf( "hmm, thread %ld is now doing a longjmp, goodluck!\n", tid );
longjmp( buf[tid], 1 );
}
#endif
printf("Thread %ld starting...\n",tid);
for ( i = 0; i < 10; i++)
{
result += (tid+1) * i;
printf( "%d: tid %ld: result = %g\n", getpid(), tid, result );
my_pthread_barrier_wait(tid, &bar);
}
printf("Thread %ld done. Result = %g\n", tid, result);
//pthread_exit((void*) t);
}
int main (int argc, char *argv[])
{
int rc;
long t;
void *status;
/* Initialize and set thread detached attribute */
pthread_barrier_init(&bar, NULL, NUM_THREADS);
#ifdef PERFORM_JMP
signal(SIGUSR1, sig_handler);
sem_init( &sem, 0, 0 );
#endif
for( t = 1; t < NUM_THREADS; t++ )
{
printf( "Main: creating thread %ld\n", t );
rc = pthread_create_with_stack( BusyWork, t ); // This is the line 52
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
thread_id[0] = 0;
BusyWork( &thread_id[0] );
/* Free attribute and wait for the other threads */
for(t=1; t<NUM_THREADS; t++)
{
rc = pthread_join(thread[t], &status);
if (rc)
{
printf("ERROR; return code from pthread_join() is %d\n", rc);
exit(-1);
}
printf("Main: completed join with thread %ld having a status"
"of %ld\n",t,(long)status);
}
printf("Main: program completed. Exiting.\n");
pthread_exit(NULL);
}
解决方法:
您试图做的事情是根本不可能的. fork从根本上与同步不兼容.即使您可以在子进程中可靠地重新创建线程,它们也将具有新的线程ID,因此它们将不是他们应该拥有的锁的所有者.
进行检查点的唯一方法是使用它的高级操作系统支持.这将必须包括单独的pid名称空间,以便该程序的检查点副本将具有相同的pid,并且其所有线程将具有相同的线程id.即使那样,如果它正在与其他进程或外界进行通信,它也将无法工作.我相信有一些工具可以在Linux上执行此操作,但是我对它们并不熟悉,因此,您现在可以进入hack级别,在这里可以适当地询问是否有更好的方法来实现您正在尝试的功能去做.
标签:setjmp,c-3,linux,x86,fork 来源: https://codeday.me/bug/20191102/1988402.html