// p.c (process)
//
// bully election algorithm
//
// compile:  gcc p.c -o p -lpthread
//
// in a system with n processes, make sure to copy p.c over n times
// for example, if you have a system with 3 processes, copy p.c to
// p0.c, p1.c, and p2.c
// then compile each one separately.

#define _REENTRANT
#include "process.h"


int p_rand(void)
{
  int p;

  while (true) {
    p = (int) ((float) PS * rand() / (RAND_MAX + 1.0));
    if (p != self)
      return p;
  }
}


void enterElection(void)
{
  int i;
  char msg[MSG_SIZ];
  bool high=false;

  printf("enterElection\n");
  
  if ( self == PS - 1) {
    printf ("enterElection: i m highest process. tell all i'm coordinator. don't care if somebody else is one\n");
    bzero (msg, MSG_SIZ);
    sprintf (msg, "nc_%d", self);
    for ( i = 0; i < PS; i++) {
      if ( i != self) {
	esend ( i, msg);
	printf ("enterElection: told process %d i am coordinator\n", i);
      }
    }
    coordinator = self;
  }
  else {   
    for ( i = self + 1; i < PS; i++) {
      printf ("enterElection: send EnterElection to higher-priority process %d\n", i);
      bzero (msg, MSG_SIZ);
      sprintf (msg, "ee_%d", self);
      if (esend (i, msg) != NULL) {
	printf("enterElection: process %d ACKED EnterElection. i lost election. no-op.\n",i);
	high = true;
	break;     // break out
      }
      else 
	printf("enterElection(): process %d is down..\n",i);
    }
    if ( high != true) {
      printf("enterElection():  no reply from higher-priority. i won. now tell everybody!\n");
      bzero (msg, MSG_SIZ);
      sprintf (msg, "nc_%d", self);
      for (i = 0; i < PS; i++) {
	if ( i != self) {
	  esend (i, msg);
	  printf ("enterElection(): sent NewCoordinator to process %d\n", i);
	}
      }
      coordinator = self;
    }
    else {
      printf("enterElection(): higher-priority process replied. election is not my problem\n");
    }
  }
}

// the controller has been simplified...
// the purpose of the controller is to start up a process by 
// 1. starting up an election
// 2. starting distributed app (handled by function dapp)
// 3. starting bully algorithm (handled by function election)
void controller(void)
{
  int ret;

  if ( (ret = pthread_create (&eetid, NULL, (void *(*) (void *)) enterElection, (void *) NULL))!=0) {
    printf ("controller can not create enterElection thread, exiting\n");
    pthread_exit (NULL);
  }
  
 
  if ( (ret = pthread_create (&dapptid, NULL, (void *(*) (void *)) dapp, (void *) NULL)) != 0) {
    printf ("controller can not create dapp thread, exiting\n");
    pthread_exit (NULL);
  }
  
  if( (ret = pthread_create (&etid, NULL, (void *(*) (void *)) election, (void *) NULL)) != 0) {
    printf ("controller can not create bully thread, exiting\n");
    pthread_exit (NULL);
  }
}



void cleanup(void *arg)
{
  int *sock = (int *) arg;
  printf("cleanup(): performing cleanup on a cancelled thread\n");
  close(*sock);
}


void election(void)
{
  char buf[BUFSIZ];
  char *ack=(char *)malloc(sizeof(char)*MSG_SIZ+1);
  int sock, len;
  struct sockaddr_in serv_adr;
  struct hostent *host;
  int i=0;
  char msg[MSG_SIZ];
  bool high=false;

  printf("election (aka bully algorithm) thread started\n");

  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); // kill thread right away
  pthread_cleanup_push(cleanup, (void *) &sock);
  
  while (true) {
    sleep (1*MUL);
    high = false;
    pthread_testcancel();        // test if thread was cancelled
    while (ctrl_sig != lively) {
      pthread_testcancel();
      sleep (1*MUL);
    }

    if (coordinator != self && coordinator != UNDEFINED) {
      // space out AreYouUp requests
      sleep (10); pthread_testcancel();

      printf ("election: sending a casual AreYouUp to coordinator %d\n", coordinator);
      bzero (msg, MSG_SIZ);
      sprintf (msg, "ayu_%d", self);
      if (esend (coordinator, msg) == NULL) {
	printf ("election: coordinator %d is down!\n", coordinator);
	coordinator = UNDEFINED;
	for (i = self + 1; i < PS; i++) {
	  printf ("poll higher-priority process %d\n",i);
	  bzero (msg, MSG_SIZ);
	  sprintf (msg, "ee_%d", self);
	  if (esend (i, msg) != NULL) {
	    printf("election: higher-priority process %d ACKed EnterElection. i lost election\n",i);
	    high = true;
	    break;     // break out
	  }
	  else {
	    printf("election: higher-priority process %d is down..\n",i);
	  }
	}
	if (self == PS - 1) {
	  printf("election: but i am the highest process and i am up. i will be coordinator..\n");
	  coordinator = self;
	}
	if (high != true) {
	  printf ("election: higher-priority process didn't respond. i won election!\n");
	  bzero (msg, MSG_SIZ);
	  sprintf (msg, "nc_%d", self);
	  for ( i = 0; i < PS; i++) {
	    if(i != self) {
	      ack = esend (i, msg);
	      printf ("election: told process %d that i am coordinator\n",i);
	      if (ack == NULL)
		printf ("election: process %d was down...\n",i);
	      else
		printf ("election: process %d heard me..\n",i);
	      sleep(2*MUL);
	    }
	  }
	  coordinator = self;
	}
      }
      else {
	printf("election: coordinator %d is alive. RELAX!\n",coordinator); 
	pthread_testcancel();
	sleep(5*MUL);
	pthread_testcancel();
      }
      close(sock);
    }
  }
  pthread_cleanup_pop(0); 
}


// sends message buf to process p
char *esend(int p, char *buf)
{
  int sock, len;
  struct sockaddr_in serv_adr;
  struct hostent *host;
  char *ack=(char *) malloc(sizeof(char)*MSG_SIZ+1);

  host=gethostbyname(PID[p].address);
  if (host==(struct hostent *) NULL) {
    perror("esend(): error getting host");
    return NULL;
  }
  else { 
    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    memcpy(&serv_adr.sin_addr,host->h_addr,host->h_length);
    serv_adr.sin_port = htons (elect_PORT);
    
    if((sock=socket(AF_INET, SOCK_STREAM,0)) < 0) { //cant opensocket
      perror("esend(): error no open socket");
      close(sock);
      return NULL;
    }
    
    if(connect(sock,(struct sockaddr * ) &serv_adr, sizeof(serv_adr)) <0) {
      printf("esend(): connection refused to processor %d\n",p);
      close(sock);
      return NULL;
    } 
    else {
      write (sock, buf, strlen(buf));
      bzero (ack, sizeof (ack));
      ack = waited_read (sock,T);
      close(sock);
      return ack;
    }
  }
}

// modified strcmp function
int _strcmp(char *haystack, char *needle)
{
  char *p_str;
  char *str=(char *) malloc(sizeof(char)*MSG_SIZ+1);
  int p;

  if(haystack==NULL || needle==NULL)
    return -1;
  if(strstr(haystack,needle)==NULL)
    return -1;
  strcpy(str,strstr(haystack,needle)); str[strlen(needle)]='\0';
  p_str=strstr(haystack,"_");
  if(p_str==NULL) 
    return -1;
  else
    p_str++;

  return atoi(p_str);
}

// restarter thread
// spawns an Expect script responsible for restarting the right process
void rst (void *p) 
{
  printf ("restarter(): about to restart process %d on host %s\n", 
	  *((int *) p), PID [ *((int *) p) ].address);

  if (fork() == 0) {
    execl ("/bin/ping", "ping", "-c 1",  PID [ *((int *) p) ].address, NULL);
    // if this thing actually returned an error occurred
    printf ("error spawning off the Expect script...sorry..\n");
  }

  printf ("Expect script dispatched\n");
}


// distributed app
//
// the purpose of the distributed app is two-fold
//
// 1. if a machine that is participating in the bully algorithm goes down, the
//    distributed app restarts it. this is accomplished by occasionally pinging the
//    machines (in random order). if a machine fails to respond, it is assumed that it
//    is no longer up. if THIS process is the coordinator it will dispatch an assistant
//    that will log into the machine that went down and restart the bully algorithm
//
// 2. the actual task that needs to be performed. only now we get to the bottom of things.
//    the job of the coordinator to oversee the execution of some process. if THIS process
//    is the coordinator it must see to it that the task is running, by using the same
//    idea employed in 1.
void dapp(void)
{
  int sock, len, ret, p;
  struct sockaddr_in serv_adr;
  struct hostent *host;
  static char ack [3];
  char buf [BUFSIZ];

  printf("dapp thread started\n");

  while (true) {
    bzero (buf, BUFSIZ); bzero (ack, strlen (ack));
    if (ctrl_sig == lively && coordinator != -1) {
      printf ("[coordinator=%d]\n", coordinator);
      p = p_rand();
      host = gethostbyname (PID [p].address);   // pick a random host
      if (host == (struct hostent *) NULL) {
	perror ("dapp: error getting host. ?will retry...");
      }
      else { 
	memset (&serv_adr, 0, sizeof (serv_adr));
	serv_adr.sin_family = AF_INET;
	memcpy (&serv_adr.sin_addr, host -> h_addr, host -> h_length);
	serv_adr.sin_port = htons (keepalive_PORT);
	
	if ( (sock = socket (AF_INET, SOCK_STREAM,0)) < 0) { //cant opensocket
	  perror("dapp: could not open socket. send dispatcher to restart process.");
	  close (sock);
	  if ( (ret = pthread_create 
	       (&rsttid, NULL, (void *(*) (void *)) rst, (void *) &p)) != 0 ) {
	    printf ("failed to create restart dispatcher thread\n");
	    pthread_exit (NULL);
	  }
	}
	
	if (connect (sock, (struct sockaddr * ) &serv_adr, sizeof (serv_adr)) < 0) {
	  perror ("dapp: connection refused. send dispatcher to restart process.");
	  close(sock);
	  if ( (ret = pthread_create 
		(&rsttid, NULL, (void *(*) (void *)) rst, (void *) &p)) != 0 ) {
	    printf ("failed to create restart dispatcher thread\n");
	    pthread_exit (NULL);
	  }
	} 
	else {
	  // send a PING message. if got an ACK all is good otherwise send dispatcher
	  strcpy (buf ,"PING");
	  write (sock, buf, strlen (buf));
	  strcpy (ack, waited_read (sock, T));
	  close (sock);
	  if (strcmp (ack, "ACK") == 0)
	    printf ("dapp: process %d is alive. good.\n", p);
	  else {
	    printf ("dapp: process %d did not ack our ping. sending dispatcher\n", p);
	    if ( (ret = pthread_create 
		  (&rsttid, NULL, (void *(*) (void *)) rst, (void *) &p)) != 0 ) {
	      printf ("failed to create restart dispatcher thread\n");
	      pthread_exit (NULL);
	    }
	  }
	}
      }
    }
    // sleep long enough so that if a dispatcher was sent out, they can restart the process
    sleep (30); 
  }
} 

int 
main (int argc, char *argv[]) {
  int sender;
  int server_sock,  client_sock,   clnt_len;
  struct sockaddr_in clnt_adr, serv_adr;
  
  int ret, len;
  char buf[BUFSIZ];

  srand (getpid());              // seed the RNG
  init();
  self = atoi (argv [0] + 3);     // please invoke the process using ./pn 
  printf ("i am process %d\n", self);
  
  // start up the controller
  if ( (ret = pthread_create (&ctid, NULL, (void *(*) (void *)) controller, (void *) NULL)) != 0) {
    printf ("can not start controller thread, exiting\n");
    pthread_exit (NULL);
  }

  // do the server stuff
  if ( (server_sock = socket (AF_INET, SOCK_STREAM, 0)) < 0) { perror ("socket error");  }
  memset (&serv_adr, 0, sizeof (serv_adr));
  serv_adr.sin_family = AF_INET;
  serv_adr.sin_addr.s_addr = htonl (INADDR_ANY);
  serv_adr.sin_port = htons (elect_PORT);
  
  if (bind (server_sock, (struct sockaddr *) &serv_adr, sizeof (serv_adr)) < 0) {
    perror ("bind error");
    close(server_sock);
  }
  
  if (listen (server_sock, MAX_LISTEN) < 0) {
    perror ("listen error");
    close (server_sock);
  }
  
  clnt_len = sizeof (clnt_adr);
  
  // server is listening on the election socket
  while (true) {
    if ( (client_sock = accept (server_sock, (struct sockaddr *) &clnt_adr, &clnt_len)) < 0) {
      perror ("accept error");
      close (server_sock);
    }
    
    bzero (buf, BUFSIZ);
    // just sit and listen to the election line
    while ( (len = read (client_sock, buf, BUFSIZ)) < 0)   // if no communications, just idle
      ;
    
    printf ("network layer: +ACK [%s]\n", buf);
 
    if ( (sender = _strcmp (buf, "nc")) >= 0) {         
      // take care of NewCoordinator
      printf("got NewCoordinator from process %d\n", sender);
      pthread_cancel(etid);
      printf("monitor_election(): election thread killed\n");
      coordinator = sender;
      bzero (buf, BUFSIZ);
      sprintf (buf, "nc");                 // ACK NewCoordinator
      write (client_sock, buf, strlen (buf));
      
      if( (ret = pthread_create (&etid, NULL, (void *(*) (void *)) election, (void *) NULL))!=0) {
	printf ("can not create election thread\n");
	pthread_exit (NULL);
      }
      printf ("new election thread has been created\n");
    }
    else if ( (sender = _strcmp (buf, "ayu")) >= 0) {
      printf ("received AreYouUp from process %d\n", sender);
      // tell 'em we are up, let them know who we think  the coordinator is
      bzero (buf, BUFSIZ);
      sprintf (buf, "ayu_%d", coordinator);  
      write (client_sock, buf, strlen (buf));
    }
    else if ( (sender = _strcmp (buf, "ee")) >= 0) {
      printf ("received EnterElection from process %d\n", sender);
      bzero (buf, BUFSIZ);
      sprintf (buf, "ee");
      write (client_sock, buf, strlen (buf));

      printf ("kill election thread\n");
      pthread_cancel(etid);
      
      // tell everybody else to enter election!
      if((ret=pthread_create(&dapptid, NULL,(void *(*) (void *)) enterElection,(void *) NULL)) !=0 ) {
	printf("controller can not create enterElection thread, exiting\n");
	pthread_exit(NULL);
      }
      
      if( (ret = pthread_create (&etid, NULL, (void *(*) (void *)) election, (void *) NULL)) != 0) {
	printf ("can not create election thread, exiting\n");
	pthread_exit (NULL);
      }
      printf ("new election thread has been started\n"); 
    }
    // although PING/ACK is not part of the election protocol, we provide this service to
    // the keepalive thread... why not?
    else if (!strcmp (buf, "PING")) {         
      printf ("got a PING from someone. sending an ACK\n");
      bzero (buf, BUFSIZ);
      sprintf (buf, "ACK");
      write (client_sock, buf, strlen (buf));
    }
    close (client_sock);
  }
}





