403Webshell
Server IP : 104.21.38.3  /  Your IP : 172.69.176.70
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /www/server/mysql/src/rapid/plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /www/server/mysql/src/rapid/plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.c
/* Copyright (c) 2012, 2023, Oracle and/or its affiliates.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License, version 2.0,
   as published by the Free Software Foundation.

   This program is also distributed with certain software (including
   but not limited to OpenSSL) that is licensed under separate terms,
   as designated in a particular file or component or in included license
   documentation.  The authors of MySQL hereby grant you an additional
   permission to link the program and your derivative works with the
   separately licensed software that they have included with MySQL.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License, version 2.0, for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */

/** \file The new version of xcom is a major rewrite to allow
    transmission of multiple messages from several sources
    simultaneously without collision. The interface to xcom is largely
    intact, one notable change is that xcom will consider the message
    delivered as soon as it has got a majority. Consequently, the VP
    set will not necessarily show all nodes which will actually
    receive the message.

    OHKFIX Add wait for complete last known node set to mimic the old
    semantics.


    IMPORTANT: What xcom does and what it does not do:

    xcom messages are received in the same order on all nodes.

    xcom guarantees that if a message is delivered to one node, it will
    eventually be seen on all other nodes as well.

    xcom messages are available to a crashed node when it comes up
    again if at least one node which knows the value of the message
    has not crashed. The size of the message cache is configurable.

    OHKFIX Add logging to disk to make messages durable across system
    crash and to increase the number of messages which may be cached.

    There is no guarantee whatsoever about the order of messages from
    different nodes, not even the order of multiple messages from the
    same node. It is up to the client to impose such an order by
    waiting on a message before it sends the next.

    xcom can notify the client that a message has timed out, and in
    that case will try to cancel the message, but it cannot guarantee
    that a message which has timed out will not be delivered.

    xcom attaches a node set to each message as it is delivered to the
    client. This node set reflects the current node set that xcom
    believes is active, it does not mean that the message has been
    delivered yet to all nodes in the set. Neither does it mean that
    the message has not been delivered to the nodes not in the set.

    A cache of Paxos state machines is central to the new design. The
    purpose of the cache is both to store a window of messages, and to
    decouple the different parts of xcom, like message proposal,
    message delivery and execution, and recovery.  The old cache was
    limited to caching messages, and a single state machine ran the
    combined VP and Paxos algorithm. This constrained xcom to deliver
    only a single message at a time.

    Each instance of the Paxos state machine implements the basic
    Paxos protocol.  Unlike the cache in the old system, it is not
    cleared when a site is deleted.  This removes some problems
    related to message delivery during site deletion.  The cache is a
    classic fixed size LRU with a hash index.

    Some extensions to the basic Paxos algorithm has been implemented:

    A node has ownership to all synodes with its own node number. Only
    a node with node number N can propose a value for synode {X N},
    where X is the sequence number, and N is the node number. Other
    nodes can only propose the special value no_op for synode {X N}.
    The reason for this is to retain the leaderless Paxos algorithm,
    but to avoid collisions between nodes which are competing for the
    same synode number. With this scheme, each node has its own unique
    number series during normal operation. The scheme has the
    following implications:

    1. If a node N has not already proposed a value for the synode {X N},
    it may at any time send a LEARN message to the other nodes with
    the reserved value no_op, without going through phase 1 and 2 of
    Paxos. This is because the other nodes are constrained to propose
    no_op for this synode, so the final outcome will always be no_op.
    To avoid unnecessary message transmission, a node will try to
    broadcast the no_op LEARN messages by piggybacking the information
    on the messages of the basic Paxos protocol.

    2. Other nodes which want to find the value of synode {X N} may do
    so by trying to get the value no_op accepted by following the
    basic Paxos algorithm. The result will be the actual value
    proposed by node N if it has done so, otherwise no_op. This will
    typically only be necessary when a node is down, and the other
    nodes need to find the values from the missing node in order to be
    able to continue execution.

    Messages are delivered in order to the client, and the order is
    determined by the sequence number and the node number, with the
    sequence number as the most significant part.

    The xcom network interface has been redesigned and is now
    implemented directly on top of TCP, and has so far been completely
    trouble free. We use poll() or select() to implement non-blocking
    send and receive, but libev could equally well have been used.

    Multicast is implemented on top of unicast as before, but the
    implementation is prepared to use real multicast with relatively
    minor changes.

    The roles of proposer, acceptor/learner, and executor are now
    directly mapped to unique task types which interact with the Paxos
    state machines, whereas the previous implementation folded all the
    roles into a single event driven state machine.

    The following terminology will be used:

    A node is an instance of the xcom thread. There is only one instance
    of the xcom thread in the agent.
    A client is the application which is using xcom to send messages.
    A thread is a real OS thread.
    A task is a logical process. It is implemented by coroutines and
    an explicit stack.

    The implementation of tasks and non-blocking socket operations is
    isolated in task.h and task.c.

    A node will open a tcp connection to each of the other nodes. This
    connection is used for all communication initiated by the node,
    and replies to messages will arrive on the connection on which it
    was sent.

    static int tcp_server(task_arg);

    The tcp_server listens on the xcom port and starts an
    acceptor_learner_task whenever a new connection is detected.

    static int tcp_reaper_task(task_arg);

    Closes tcp connection which have been unused for too long.

    static int sender_task(task_arg);

    The sender_task waits for tcp messages on its input queue and
    sends it on the tcp socket. If the socket is closed for any
    reason, the sender_task will reconnect the socket. There is one
    sender_task for each socket. The sender task exists mainly to
    simplify the logic in the other tasks, but it could have been
    replaced with a coroutine which handles the connection logic after
    having reserved the socket for its client task.

    static int generator_task(task_arg);

    The generator_task reads messages from the client queue and moves
    them into the input queue of the proposer_task.

    OHKFIX Use a tcp socket instead of the client queue. We can then
    remove the generator_task and let the acceptor_learner_task do the
    dispatching.

    static int proposer_task(task_arg);

    Assign a message number to an outgoing message and try to get it
    accepted. There may be several proposer tasks on each node
    working in parallel. If there are multiple proposer tasks, xcom can
    not guarantee that the messages will be sent in the same order as
    received from the client.

    static int acceptor_learner_task(task_arg);

    This is the server part of the xcom thread. There is one
    acceptor_learner_task for each node in the system. The acceptor
    learner_task reads messages from the socket, finds the correct
    Paxos state machine, and dispatches to the correct message handler
    with the state machine and message as arguments.

    static int reply_handler_task(task_arg);

    The reply_handler_task does the same job as the
    acceptor_learner_task, but listens on the socket which the node
    uses to send messages, so it will handle only replies on that
    socket.

    static int executor_task(task_arg);

    The ececutor_task waits for a Paxos message to be accpeted. When
    the message is accepted, it is delivered to the client,
    unless it is a no-op. In either case, the executor_task steps to
    the next message and repeats the wait. If it times out waiting for
    a message, it will try to get a no-op accepted.

    static int alive_task(task_arg);

    Sends i-am-alive to other nodes if there has been no normal traffic
    for a while. It also pings nodes which seem to be inactive.

    static int detector_task(task_arg);

    The detector_task periodically scans the set of connections from
    other nodes and sees if there has been any activity. If there has
    been no activity for some time, it will assume that the node is
    dead, and send a view message to the client.

    static int boot_task(task_arg);

    The boot task is started whenever xcom has no site definition. It
    listens on the input queue until it detects either a boot or
    recovery. In case of a boot, it will wait for a unified boot
    message.  In case of local recovery, it will wait until has seen
    all recover messages.  In both cases, the proposer task will try
    to get those messages accepted/

    static int boot_killer_task(task_arg);

    Abort the boot process if there is no progress.

    static int net_boot_task(task_arg);
    static int net_recover_task(task_arg);

    Reconfiguration:

    The xcom reconfiguration process is essentially the one described in
    "Reconfiguring a State Machine" by Lamport et al. as the R-alpha
    algorithm.
    We execute the reconfiguration command immediately, but the config is
    only valid after a delay of alpha messages.
    The parameter alpha is the same as
    EVENT_HORIZON in this implementation. :/static.*too_far
    All tcp messages from beyond the event horizon will be ignored.

*/

#include "x_platform.h"

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <assert.h>
#include <signal.h>
#include <sys/time.h>
#include <limits.h>

#ifndef WIN
#include <sys/socket.h>
#include <netdb.h>
#include <sys/ioctl.h>
#include <net/if.h>
#ifndef __linux__
#include <sys/sockio.h>
#endif
#endif

#if defined(WIN32) || defined(WIN64)
#include <windows.h>
#endif

#ifndef _WIN32
#include <poll.h>
#endif

#include "xdr_utils.h"
#include "xcom_common.h"

#include "task_os.h"

#include "xcom_vp.h"

#include "simset.h"
#include "app_data.h"

#include "task.h"
#include "node_no.h"
#include "server_struct.h"
#include "xcom_detector.h"
#include "site_struct.h"
#include "xcom_transport.h"
#include "xcom_base.h"

#ifdef XCOM_HAVE_OPENSSL
#include "xcom_ssl_transport.h"
#endif

#include "task.h"
#include "task_net.h"
#include "task_debug.h"
#include "xcom_statistics.h"
#include "node_set.h"
#include "node_list.h"
#include "bitset.h"

#include "xcom_cache.h"

#include "xcom_vp_str.h"
#include "pax_msg.h"
#include "xcom_msg_queue.h"
#include "xcom_recover.h"
#include "synode_no.h"
#include "sock_probe.h"
#include "xcom_interface.h"
#include "xcom_memory.h"
#include "site_def.h"
#include "xcom_cfg.h"

#ifdef XCOM_HAVE_OPENSSL
#include "openssl/ssl.h"
#endif

/* {{{ Defines and constants */

#define SYS_STRERROR_SIZE 512
#define TERMINATE_DELAY 3.0
#define EVENT_HORIZON_MIN 10
unsigned int event_horizon = EVENT_HORIZON_MIN;

static void set_event_horizon(unsigned int eh) MY_ATTRIBUTE((unused));
/* purecov: begin deadcode */
static void set_event_horizon(unsigned int eh)
{
	DBGOUT(FN; NDBG(eh,u));
	event_horizon = eh;
}
/* purecov: end */

/* The number of proposers on one node */
#define PROPOSERS 10

/* Limit the number of acceptors */
/* #define MAXACCEPT 5  */

/* Skip prepare for first ballot */
int const threephase = 0;

/* Error injection for testing */
#define INJECT_ERROR 0

/* Crash a node early */
/* #define CRASH 1 */

/* }}} */


#include "retry.h"

/* #define USE_EXIT_TYPE */
/* #define NO_SWEEPER_TASK */

/* Limit batch size to sensible ? amount */
enum{
	MAX_BATCH_SIZE = 0x3fffffff
};

int ARBITRATOR_HACK = 0;
static int AUTOBATCH = 1;
#define AGGRESSIVE_SWEEP

static int const no_duplicate_payload = 1;

/* Use buffered read when reading messages from the network */
static int use_buffered_read = 1;

/* Used to handle OOM errors */
static unsigned short oom_abort = 0;

/* {{{ Forward declarations */

long	get_unique_long(void);
unsigned long	msg_count(app_data_ptr a);
void	get_host_name(char *a, char *name);

static double	wakeup_delay(double old);

/* Task types */
static int	proposer_task(task_arg arg);
static int	executor_task(task_arg arg);
static int	sweeper_task(task_arg arg);
extern int	alive_task(task_arg arg);
static int	generator_task(task_arg arg);
extern int	detector_task(task_arg arg);

static int	finished(pax_machine *p);
static int	accepted(pax_machine *p);
static int	started(pax_machine *p);
static synode_no first_free_synode(synode_no msgno);
static void free_forced_config_site_def();

extern void	bit_set_or(bit_set *x, bit_set const *y);

/* }}} */

/* {{{ Global variables */

int	xcom_shutdown = 0; /* Xcom_Shutdown flag */
synode_no executed_msg;    /* The message we are waiting to execute */
synode_no max_synode;      /* Max message number seen so far */
task_env *boot = NULL;
task_env *detector = NULL;
task_env *killer = NULL;
task_env *net_boot = NULL;
task_env *net_recover = NULL;
void	*xcom_thread_input = 0;

static void	init_proposers();

void	init_base_vars()
{
	xcom_shutdown = 0; /* Xcom_Shutdown flag */
	executed_msg = null_synode; /* The message we are waiting to execute */
	max_synode = null_synode; /* Max message number seen so far */
	boot = NULL;
	detector = NULL;
	killer = NULL;
	net_boot = NULL;
	net_recover = NULL;
	xcom_thread_input = 0;
}

static task_env *executor = NULL;
static task_env *sweeper = NULL;
static task_env *retry = NULL;
static task_env *proposer[PROPOSERS];
static task_env *alive_t = NULL;

static uint32_t	my_id = 0; /* Unique id of this instance */
static synode_no current_message; /* Current message number */
static synode_no last_config_modification_id; /*Last configuration change proposal*/

synode_no get_current_message()
{
	return current_message;
}

static channel prop_input_queue;         /* Proposer task input queue */

/* purecov: begin deadcode */
channel *get_prop_input_queue()
{
	return & prop_input_queue;
}
/* purecov: end */

extern int	client_boot_done;
extern int	netboot_ok;
extern int	booting;

extern start_t start_type;
static linkage exec_wait = {0,&exec_wait, &exec_wait};           /* Executor will wake up tasks sleeping here */

/*
#define IGNORE_LOSERS
*/

#define BUILD_TIMEOUT 3.0

#define MAX_DEAD 10
static struct {
	int	n;
	unsigned long	id[MAX_DEAD];
} dead_sites;

synode_no get_max_synode()
{
	return max_synode;
}

static
void synode_set_to_event_horizon(synode_no *s)
{
  s->msgno += event_horizon + 1;
  s->node= 0;
}


/**
   Set node group
*/
void	set_group(uint32_t id)
{
	MAY_DBG(FN; STRLIT("changing group id of global variables "); NDBG(id,lx););
/*	set_group_id(id); */
	current_message.group_id = id;
	executed_msg.group_id = id;
	max_synode.group_id = id;
	set_log_group_id(id);
}


static void	bury_site(uint32_t id)
{
	if (id != 0) {
		dead_sites.id[dead_sites.n % MAX_DEAD] = id;
		dead_sites.n = (dead_sites.n + 1) % MAX_DEAD;
	}
}


static bool_t is_dead_site(uint32_t id)
{
	int	i = 0;
	for (i = 0; i < MAX_DEAD; i++) {
		if (dead_sites.id[i] == id)
			return TRUE;
		else if (dead_sites.id[i] == 0)
			return FALSE;
	}
	return FALSE;
}

extern node_set *init_node_set(node_set *set, u_int n);
extern node_set *alloc_node_set(node_set *set, u_int n);

#if 0
/* Find our previous message number. */
static synode_no decr_msgno(synode_no msgno)
{
	synode_no ret = msgno;
	ret.msgno--;
	ret.node = get_nodeno(find_site_def(ret)); /* In case site and node number has changed */
	return ret;
}
#endif

/* Find our next message number. */
static synode_no incr_msgno(synode_no msgno)
{
	synode_no ret = msgno;
	ret.msgno++;
	ret.node = get_nodeno(find_site_def(ret)); /* In case site and node number has changed */
	return ret;
}

#if 0
/* Given message number, compute which node it belongs to */
static unsigned int	msgno_to_node(synode_no msgno)
{
	return msgno.node;
}
#endif

synode_no incr_synode(synode_no synode)
{
	synode_no ret = synode;
	ret.node++;
	if (ret.node >= get_maxnodes(find_site_def(synode))) {
		ret.node = 0;
		ret.msgno++;
	}
/* 	DBGOUT(FN; SYCEXP(synode); SYCEXP(ret)); */
	return ret; /* Change this if we change message number type */
}


synode_no decr_synode(synode_no synode)
{
	synode_no ret = synode;
	if (ret.node == 0) {
		ret.msgno--;
		ret.node = get_maxnodes(find_site_def(ret));
	}
	ret.node--;
	return ret; /* Change this if we change message number type */
}


static void	skip_value(pax_msg *p)
{
	MAY_DBG(FN; SYCEXP(p->synode));
	p->op = learn_op;
	p->msg_type = no_op;
}


/* }}} */

/* {{{ Utilities and debug */

/* purecov: begin deadcode */
/* Print message and exit */
static void	pexitall(int i)
{
	int	*r = (int*)calloc(1, sizeof(int));
	*r = i;
	DBGOUT(FN; NDBG(i, d); STRLIT("time "); NDBG(task_now(), f); );
	XCOM_FSM(xa_terminate, int_arg(i));	/* Tell xcom to stop */
}
/* purecov: end */

#ifndef WIN
/* Ignore this signal */
static int	ignoresig(int signum)
{
	struct sigaction act;
	struct sigaction oldact;

	memset(&act, 0, sizeof(act));
	act.sa_handler = SIG_IGN;
	memset(&oldact, 0, sizeof(oldact));

	return sigaction(signum,
	    &act,
	    &oldact);
}
#else
#define SIGPIPE 0
static int	ignoresig(int signum)
{
	return 0;
}
#endif

/* }}} */

#if 0
static void	dbg_machine_and_msg(pax_machine *p, pax_msg *pm)
{
  GET_GOUT;
  STRLIT("machine ");
  ADD_GOUT(dbg_pax_machine(p));
  STRLIT(" ");
  STRLIT("msg ");
  COPY_AND_FREE_GOUT(dbg_pax_msg(pm));
  PRINT_GOUT;
  FREE_GOUT;
}


#endif

static int	recently_active(pax_machine *p)
{
	MAY_DBG(FN;
			SYCEXP(p->synode); STRLIT(" op ");
            PTREXP(p); STRLIT(p->learner.msg ? pax_op_to_str(p->learner.msg->op) : "NULL");
	    NDBG(p->last_modified, f); NDBG(task_now(), f));
	return p->last_modified != 0.0 && (p->last_modified + 0.5 + median_time()) > task_now();
}


static inline int	finished(pax_machine *p)
{
	MAY_DBG(FN;
	    SYCEXP(p->synode); STRLIT(" op ");
	    PTREXP(p); STRLIT(p->learner.msg ? pax_op_to_str(p->learner.msg->op) : "NULL");
	    );
	return p->learner.msg && (p->learner.msg->op == learn_op || p->learner.msg->op == tiny_learn_op);
}


int	pm_finished(pax_machine *p)
{
	return finished(p);
}


static inline int	accepted(pax_machine *p)
{
	MAY_DBG(FN;
	    SYCEXP(p->synode); STRLIT(" op ");
	    PTREXP(p); STRLIT(p->acceptor.msg ? pax_op_to_str(p->acceptor.msg->op) : "NULL");
	    );
	return p->acceptor.msg && p->acceptor.msg->op != initial_op;
}


static inline int	accepted_noop(pax_machine *p)
{
	MAY_DBG(FN;
	    SYCEXP(p->synode); STRLIT(" op ");
	    PTREXP(p); STRLIT(p->acceptor.msg ? pax_op_to_str(p->acceptor.msg->op) : "NULL");
	    );
	return accepted(p) && p->acceptor.msg->msg_type == no_op;
}


static inline int	noop_match(pax_machine *p, pax_msg *pm)
{
	return pm->msg_type == no_op && accepted_noop(p);
}


static inline int	started(pax_machine *p)
{
	return
	    p->op != initial_op ||
	    (p->acceptor.promise.cnt > 0) ||
	    (p->proposer.msg && (p->proposer.msg->op != initial_op)) ||
	    accepted(p) ||
	    finished(p);
}


/* }}} */

void set_last_received_config(synode_no received_config_change)
{
  last_config_modification_id= received_config_change;
}

/* {{{ Definition of majority */
static inline node_no max_check(site_def const *site)
{
#ifdef MAXACCEPT
	return MIN(get_maxnodes(site), MAXACCEPT);
#else
	return get_maxnodes(site);
#endif
}

static site_def * forced_config = 0;

/* Definition of majority */
static inline int	majority(bit_set const *nodeset, site_def const *s, int all, int delay MY_ATTRIBUTE((unused)), int force)
{
	node_no ok = 0;
	node_no i = 0;
	int	retval = 0;
	node_no max = max_check(s);

 	/* DBGOUT(FN; NDBG(max,lu); NDBG(all,d); NDBG(delay,d); NDBG(force,d)); */

	/* Count nodes that has answered */
	for (i = 0; i < max; i++) {
		if (BIT_ISSET(i, nodeset)) {
			ok++;
		}
	}

	/* If we are forcing messages, attempt to ensure consistency by
	   requiring all remaining nodes to agree. Forced_config points to
	   the config that should be used as acceptors in this
	   case. Another possibility is to use the original config and
	   count the number of live nodes, but since the force flag is
	   being used only to force a new config, it seems safer to use
	   the new config and no time-dependent info. Note that we are
	   counting the answers based on the normal config, but use the
	   number of nodes from forced_config. This is safe, since we can
	   assume that the nodes that are not in forced_config will never
	   answer. */

	if(force){
		DBGOUT(FN; STRLIT("force majority"); NDBG(ok ,u);  NDBG(max ,u); NDBG(get_maxnodes(forced_config),u));
		return ok == get_maxnodes(forced_config);
	}else{
		/* Have now seen answer from all live nodes */
		retval = all ? ok == max : ok > max / 2
			|| (ARBITRATOR_HACK && (2 == max));
		/* 	DBGOUT(FN; NDBG(max,lu); NDBG(all,d); NDBG(delay,d); NDBG(retval,d)); */
		return retval;
	}
}


#define IS_CONS_ALL(p) ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)

/* See if a majority of acceptors have answered our prepare */
static int	prep_majority(site_def const * site, pax_machine *p)
{

	int	ok = 0;

	assert(p);
	assert(p->proposer.prep_nodeset);
	assert(p->proposer.msg);
	/* DBGOUT(FN; BALCEXP(p->proposer.bal)); */
	ok = majority(p->proposer.prep_nodeset, site, IS_CONS_ALL(p), p->proposer.bal.cnt == 1, p->proposer.msg->force_delivery || p->force_delivery);
	return ok;
}

/* See if a majority of acceptors have answered our propose */
static int	prop_majority(site_def const * site, pax_machine *p)
{
	int	ok = 0;

	assert(p);
	assert(p->proposer.prop_nodeset);
	assert(p->proposer.msg);
	/* DBGOUT(FN; BALCEXP(p->proposer.bal)); */
	ok = majority(p->proposer.prop_nodeset, site, IS_CONS_ALL(p), p->proposer.bal.cnt == 1, p->proposer.msg->force_delivery || p->force_delivery);
	return ok;
}

/* }}} */

/* {{{ Xcom thread */

/* purecov: begin deadcode */
/* Xcom thread start function */
gpointer xcom_thread_main(gpointer cp)
{
	G_MESSAGE("Starting xcom on port %d", atoi((char *)cp));
	xcom_thread_init();
	/* Initialize task system and enter main loop */
	taskmain((xcom_port)atoi((char *)cp));
	/* Xcom is finished when we get here */
	DBGOUT(FN; STRLIT("Deconstructing xcom thread"));
	xcom_thread_deinit();
	G_MESSAGE("Exiting xcom thread");
	return NULL;
}
/* purecov: end */
static site_def const * executor_site = 0;

site_def const * get_executor_site()
{
	return executor_site;
}

static site_def *proposer_site = 0;

site_def const *get_proposer_site()
{
	return proposer_site;
}


void	init_xcom_base()
{
	xcom_shutdown = 0;
	current_message = null_synode;
	executed_msg = null_synode;
	max_synode = null_synode;
	client_boot_done = 0;
	netboot_ok = 0;
	booting = 0;
	start_type = IDLE;

	xcom_recover_init();
	my_id = new_id();
	push_site_def(NULL);
/*	update_servers(NULL); */
	xcom_cache_var_init();
	median_filter_init();
	link_init(&exec_wait, type_hash("task_env"));
	executor_site = 0;
	proposer_site = 0;
}

static void	init_tasks()
{
	set_task(&boot, NULL);
	set_task(&net_boot, NULL);
	set_task(&net_recover, NULL);
	set_task(&killer, NULL);
	set_task(&executor, NULL);
	set_task(&retry, NULL);
	set_task(&detector, NULL);
	init_proposers();
	set_task(&alive_t, NULL);
	set_task(&sweeper, NULL);
}


/* Initialize the xcom thread */
void	xcom_thread_init()
{
#ifndef NO_SIGPIPE
	signal(SIGPIPE, SIG_IGN);
#endif
	init_base_vars();
	init_site_vars();
	init_crc32c();
	my_srand48((long int)task_now());

	init_xcom_base();
	init_tasks();
	init_cache();

	/* Initialize input queue */
	channel_init(&prop_input_queue, type_hash("msg_link"));
	init_link_list();
	task_sys_init();
}


/* Empty the proposer input queue */
static void	empty_prop_input_queue()
{
	empty_msg_channel(&prop_input_queue);
	MAY_DBG(FN; STRLIT("prop_input_queue empty"));
}


/* De-initialize the xcom thread */
void	xcom_thread_deinit()
{
	DBGOUT(FN; STRLIT("Empty proposer input queue"));
	empty_prop_input_queue();
	DBGOUT(FN; STRLIT("Empty link free list"));
	empty_link_free_list();
	DBGOUT(FN; STRLIT("De-initialize cache"));
	deinit_cache();
	garbage_collect_servers();
}

#define PROP_ITER int i;  for(i = 0; i < PROPOSERS; i++)

static bool_t force_recover = FALSE;
/* purecov: begin deadcode */
bool_t must_force_recover()
{
	return force_recover;
}


void	set_force_recover(bool_t const x)
{
	force_recover = x;
}
/* purecov: end */

static void	init_proposers()
{
	PROP_ITER {
		set_task(&proposer[i], NULL);
	}
}


static void	create_proposers()
{
	PROP_ITER {
		set_task(&proposer[i], task_new(proposer_task, int_arg(i), "proposer_task", XCOM_THREAD_DEBUG));
	}
}


static void	terminate_proposers()
{
	PROP_ITER {
		task_terminate(proposer[i]);
	}
}

static void free_forced_config_site_def()
{
  free_site_def(forced_config);
  forced_config= NULL;
}

#if TASK_DBUG_ON
static void	dbg_proposers() MY_ATTRIBUTE((unused));
static void	dbg_proposers()
{
	GET_GOUT;
	NDBG(PROPOSERS, d);
	{
		PROP_ITER {
			PPUT(proposer[i]);
		}
	}
	PRINT_GOUT;
	FREE_GOUT;
}
#endif

static void	set_proposer_startpoint()
{
	DBGOHK(FN; STRLIT("changing current message"));
	if (max_synode.msgno <= 1)
		set_current_message(first_free_synode(max_synode));
	else
		set_current_message(incr_msgno(first_free_synode(max_synode)));
}



void	check_tasks()
{
}


/* }}} */

/* {{{ Task functions */
/* purecov: begin deadcode */
/* Match any port */
static int yes(xcom_port port MY_ATTRIBUTE((unused)))
{
  return 1;
}

/* Create tasks and enter the task main loop */
int	taskmain(xcom_port listen_port)
{
	init_xcom_transport(listen_port);
	set_port_matcher(yes); /* For clients that use only addr, not addr:port  */

	MAY_DBG(FN; STRLIT("enter taskmain"));
	ignoresig(SIGPIPE);

	{
		result	fd = {0,0};

		if ((fd = announce_tcp(listen_port)).val < 0) {
			MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
			task_dump_err(fd.funerr);
			g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
		}

		MAY_DBG(FN; STRLIT("Creating tasks"));
		task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG);
		task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
		/* task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG); */
		/* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
		/* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
		MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
	}

	task_loop();

	MAY_DBG(FN; STRLIT(" exit"));
	return 1;
}


void	start_run_tasks()
{
	force_recover = 0;
	client_boot_done = 1;
	netboot_ok = 1;
	booting = 0;
	set_proposer_startpoint();
	create_proposers();
	set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
	set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
	set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
	set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
}

/* Create tasks and enter the task main loop */
int	xcom_taskmain(xcom_port listen_port)
{
	init_xcom_transport(listen_port);

	MAY_DBG(FN; STRLIT("enter taskmain"));
	ignoresig(SIGPIPE);

	{
		result fd = {0,0};
		if ((fd = announce_tcp(listen_port)).val < 0) {
			MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
			task_dump_err(fd.funerr);
			g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
			pexitall(1);
		}

		MAY_DBG(FN; STRLIT("Creating tasks"));
		/* task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG); */
		task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
		task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);
		/* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
		/* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
		MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
	}

	start_run_tasks();
	task_loop();

	MAY_DBG(FN; STRLIT(" exit"));
	return 1;
}
/* purecov: end */
static xcom_state_change_cb xcom_run_cb = 0;
static xcom_state_change_cb xcom_terminate_cb = 0;
static xcom_state_change_cb xcom_comms_cb = 0;
static xcom_state_change_cb xcom_exit_cb = 0;
static xcom_state_change_cb xcom_expel_cb = 0;

void set_xcom_run_cb(xcom_state_change_cb x)
{
	xcom_run_cb = x;
}

void set_xcom_comms_cb(xcom_state_change_cb x)
{
  xcom_comms_cb = x;
}
/* purecov: begin deadcode */
void set_xcom_terminate_cb(xcom_state_change_cb x)
{
	xcom_terminate_cb = x;
}
/* purecov: end */
void set_xcom_exit_cb(xcom_state_change_cb x)
{
	xcom_exit_cb = x;
}

void set_xcom_expel_cb(xcom_state_change_cb x)
{
	xcom_expel_cb = x;
}

int	xcom_taskmain2(xcom_port listen_port)
{
  init_xcom_transport(listen_port);

	MAY_DBG(FN; STRLIT("enter taskmain"));
	ignoresig(SIGPIPE);

	 {
		result fd = {0,0};
		if ((fd = announce_tcp(listen_port)).val < 0) {
			MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
			task_dump_err(fd.funerr);
			g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
			if(xcom_comms_cb){
				xcom_comms_cb(XCOM_COMMS_ERROR);
			}
			if(xcom_terminate_cb){
				xcom_terminate_cb(0);
			}
			return 1;
		}

		if(xcom_comms_cb){
			xcom_comms_cb(XCOM_COMMS_OK);
		}

		MAY_DBG(FN; STRLIT("Creating tasks"));
		/* task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG); */
		task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
		task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);
		/* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
		/* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
		MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
	}

	task_loop();

#if defined(XCOM_HAVE_OPENSSL)
	xcom_cleanup_ssl();
#endif

	MAY_DBG(FN; STRLIT(" exit"));
	xcom_thread_deinit();
	return 1;
}


/* {{{ Paxos message construction and sending */

/* Initialize a message for sending */
static void	prepare(pax_msg *p, pax_op op)
{
	p->op = op;
	p->reply_to = p->proposal;
}


/* Initialize a prepare_msg */
static int	prepare_msg(pax_msg *p)
{
	prepare(p, prepare_op);
	/* p->msg_type = normal; */
	return send_to_acceptors(p, "prepare_msg");
}


/* Initialize a noop_msg */
static pax_msg *create_noop(pax_msg *p)
{
	prepare(p, prepare_op);
	p->msg_type = no_op;
	return p;
}


/* Initialize a read_msg */
static pax_msg *create_read(site_def const * site, pax_msg *p)
{
	p->msg_type = normal;
	p->proposal.node = get_nodeno(site);
	prepare(p, read_op);
	return p;
}


static int	skip_msg(pax_msg *p)
{
	prepare(p, skip_op);
	MAY_DBG(FN; STRLIT("skipping message "); SYCEXP(p->synode));
	p->msg_type = no_op;
	return send_to_all(p, "skip_msg");
}


static void	brand_app_data(pax_msg *p)
{
	if (p->a) {
		p->a->app_key.msgno = p->synode.msgno;
		p->a->app_key.node = p->synode.node;
		p->a->app_key.group_id = p->a->group_id = p->synode.group_id;
	}
}


static synode_no my_unique_id(synode_no synode)
{
	assert(my_id != 0);
/* Random number derived from node number and timestamp which uniquely defines this instance */
	synode.group_id = my_id;
	return synode;
}


static void	set_unique_id(pax_msg *msg, synode_no synode )
{
	app_data_ptr a = msg->a;
	while (a) {
		a->unique_id = synode;
		a = a->next;
	}
}


static int	propose_msg(pax_msg *p)
{
	p->op = accept_op;
	p->reply_to = p->proposal;
	brand_app_data(p);
	/* set_unique_id(p, my_unique_id(synode)); */
	return send_to_acceptors(p, "propose_msg");
}


static void	set_learn_type(pax_msg *p)
{
	p->op = learn_op;
	p->msg_type = p->a ? normal : no_op;
}

/* purecov: begin deadcode */
static int	learn_msg(site_def const * site, pax_msg *p)
{
	set_learn_type(p);
	p->reply_to = p->proposal;
	brand_app_data(p);
	MAY_DBG(FN;
	    dbg_bitset(p->receivers, get_maxnodes(site));
	    );
	return send_to_all_site(site, p, "learn_msg");
}
/* purecov: end */
static int	tiny_learn_msg(site_def const *site, pax_msg *p)
{
	int	retval;
	pax_msg * tmp = clone_pax_msg_no_app(p);
	pax_machine * pm = get_cache(p->synode);

	ref_msg(tmp);
	tmp->msg_type = p->a ? normal : no_op;
	tmp->op = tiny_learn_op;
	tmp->reply_to = pm->proposer.bal;
	brand_app_data(tmp);
	MAY_DBG(FN;
	    dbg_bitset(tmp->receivers, get_maxnodes(site));
	    );
	retval = send_to_all_site(site, tmp, "tiny_learn_msg");
	unref_msg(&tmp);
	return retval;
}


/* }}} */

/* {{{ Proposer task */

static void	prepare_push_3p(site_def const * site, pax_machine *p, pax_msg *msg, synode_no msgno)
{
	MAY_DBG(FN;
	    SYCEXP(msgno);
	    NDBG(p->proposer.bal.cnt, d); NDBG(p->acceptor.promise.cnt, d));
	p->proposer.bal.node = get_nodeno(site);
	 {
		int	maxcnt = MAX(p->proposer.bal.cnt, p->acceptor.promise.cnt);
		p->proposer.bal.cnt = ++maxcnt;
	}
	msg->synode = msgno;
	msg->proposal = p->proposer.bal;
}


static void	push_msg_2p(site_def const * site, pax_machine *p)
{
	assert(p->proposer.msg);

	BIT_ZERO(p->proposer.prop_nodeset);
	MAY_DBG(FN; SYCEXP(p->synode));
	p->proposer.bal.cnt = 0;
	p->proposer.bal.node = get_nodeno(site);
	p->proposer.msg->proposal = p->proposer.bal;
	p->proposer.msg->synode = p->synode;
	p->proposer.msg->force_delivery = p->force_delivery;
	propose_msg(p->proposer.msg);
}


static void	push_msg_3p(site_def const * site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
{
	assert(msgno.msgno != 0);
	prepare_push_3p(site, p, msg, msgno);
	msg->msg_type = msg_type;
	BIT_ZERO(p->proposer.prep_nodeset);
	assert(p->proposer.msg);
	msg->force_delivery = p->force_delivery;
	prepare_msg(msg);
	MAY_DBG(FN; BALCEXP(msg->proposal);
	    SYCEXP(msgno); STRLIT(" op "); STRLIT(pax_op_to_str(msg->op)));
}


/* Brand client message with unique ID */
static void	brand_client_msg(pax_msg *msg, synode_no msgno)
{
	assert(!synode_eq(msgno, null_synode));
	set_unique_id(msg, my_unique_id(msgno));
}

/* purecov: begin deadcode */
static int	reject_send(site_def const * site, recover_action r)
{
	return r != rec_send && xcom_booted() && (!site || !enough_live_nodes(site));
}
/* purecov: end */

void	xcom_send(app_data_ptr a, pax_msg *msg)
{
	MAY_DBG(FN; PTREXP(a); SYCEXP(a->app_key); SYCEXP(msg->synode));
	msg->a = a;
	msg->op = client_msg;
	 {
		msg_link * link = msg_link_new(msg, VOID_NODE_NO);
		MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_pax_msg(msg)));
		channel_put(&prop_input_queue, &link->l);
	}
}

/* purecov: begin deadcode */
static int	generator_task(task_arg arg MY_ATTRIBUTE((unused)))
{
	DECL_ENV
	    int	dummy;
	END_ENV;

	TASK_BEGIN
	    MAY_DBG(FN; );
	check_tasks(); /* Start tasks which should be running */
	for(;;) {
		app_data_ptr a = 0;
		while (a) {
			assert(!(a->chosen && synode_eq(a->app_key, null_synode)));
			MAY_DBG(FN; PTREXP(a); SYCEXP(a->app_key));
			MAY_DBG(FN;
			    COPY_AND_FREE_GOUT(dbg_app_data(a));
			    );
			if (a->body.c_t == exit_type) {
				bury_site(get_group_id(get_site_def()));
				copy_app_data(&a, NULL);
				task_terminate_all();    /* Kill, kill, kill, kill, kill, kill. This is the end. */

				init_xcom_base();             /* Reset shared variables */
				init_tasks();            /* Reset task variables */
				free_site_defs();
				free_forced_config_site_def();
				garbage_collect_servers();
				DBGOUT(FN; STRLIT("shutting down"));
				xcom_shutdown = 1;
				TERMINATE;
			} else if (a->body.c_t == reset_type || a->body.c_t == remove_reset_type) {
				if(a->body.c_t == reset_type) /* Not for remove node */
					bury_site(get_group_id(get_site_def()));
				copy_app_data(&a, NULL);
				init_xcom_base();             /* Reset shared variables */
				check_tasks(); /* Stop tasks which should not be running */
				free_site_defs();
				free_forced_config_site_def();
				garbage_collect_servers();
			} else {
				if (reject_send(get_site_def(), a->recover)) {
					copy_app_data(&a, NULL);
				} else {
					pax_msg * msg = pax_msg_new(null_synode, get_site_def());
					if (is_real_recover(a)) {
						msg->start_type = RECOVER;
						if (force_recover) {
							/* We are desperate to recover,
							   fake an accepted message with null key */
							DBGOUT(FN; STRLIT("forcing recovery "));
							a->chosen = TRUE;
						}
					}
					xcom_send(a, msg);
				}
			}
		}

		TASK_DELAY(0.1);
	}
	FINALLY
	    TASK_END;
}
/* purecov: end */

#define FNVSTART 0x811c9dc5

/* Fowler-Noll-Vo type multiplicative hash */
static uint32_t fnv_hash(unsigned char *buf, size_t length, uint32_t sum)
{
  size_t i = 0;
  for (i = 0; i < length; i++) {
    sum = sum * (uint32_t)0x01000193 ^ (uint32_t)buf[i];
  }
  return sum;
}

/**
   Create a new (hopefully unique) ID. The basic idea is to create a hash from
   the host ID and a timestamp.
*/
uint32_t new_id()
{
	long	id = get_unique_long();
	double	timestamp = task_now();
	uint32_t retval = 0;
	while (retval == 0 ||
	    is_dead_site(retval)) { /* Avoid returning 0 or already used site id */
		retval = fnv_hash((unsigned char *) & id, sizeof(id), 0);
		retval = fnv_hash((unsigned char *) & timestamp, sizeof(timestamp), retval);
	}
	return retval;
}

static synode_no getstart(app_data_ptr a)
{
	synode_no retval = null_synode;
	G_DEBUG("getstart group_id %x", a->group_id);
	if (!a || a->group_id == null_id) {
		retval.group_id = new_id();
	} else {
		a->app_key.group_id = a->group_id;
		retval = a->app_key;
		if (get_site_def() && retval.msgno != 1) {
			/* Not valid until after event horizon has been passed */
			synode_set_to_event_horizon(&retval);
		}
	}
	return retval;
}

void site_install_action(site_def *site, cargo_type operation)
{
	DBGOUT(FN; NDBG(get_nodeno(get_site_def()), u));
	if (synode_gt(site->start, max_synode))
		set_max_synode(site->start);
	site->nodeno = xcom_find_node_index(&site->nodes);
	push_site_def(site);
	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_site_def(site)));
	set_group(get_group_id(site));
	if(get_maxnodes(get_site_def())){
		update_servers(site, operation);
	}
	site->install_time = task_now();
	DBGOUT(FN; SYCEXP(site->start); SYCEXP(site->boot_key));
	DBGOUT(FN; NDBG(get_nodeno(site), u));
	DBGOUT(SYCEXP(site->start); SYCEXP(site->boot_key); NDBG(site->install_time,f));
	DBGOUT(NDBG(get_nodeno(site), u));
}

static site_def *create_site_def_with_start(app_data_ptr a, synode_no start)
{
	site_def * site = new_site_def();
	MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
	init_site_def(a->body.app_u_u.nodes.node_list_len,
				  a->body.app_u_u.nodes.node_list_val, site);
	site->start = start;
	site->boot_key = a->app_key;
	return site;
}


static site_def * install_ng_with_start(app_data_ptr a, synode_no start)
{
	if (a) {
		site_def *site = create_site_def_with_start(a, start);
		site_install_action(site, a->body.c_t);
		return site;
	}
	return 0;
}


site_def *install_node_group(app_data_ptr a)
{
	ADD_EVENTS(
	    add_event(string_arg("a->app_key"));
	    add_synode_event(a->app_key);
	    );
	if (a)
		return install_ng_with_start(a, getstart(a));
		else
		return 0;
}

/* purecov: begin deadcode */
int	is_real_recover(app_data_ptr a)
{
	return  a && a->body.c_t == xcom_recover && a->body.app_u_u.rep.msg_list.synode_no_array_len > 0;
}
/* purecov: end */

void	set_max_synode(synode_no synode)
{
	max_synode = synode; /* Track max synode number */
	MAY_DBG(FN; STRLIT("new "); SYCEXP(max_synode));
}

/* purecov: begin deadcode */
static void	learn_accepted_value(site_def const * site, pax_msg *p, synode_no synode)
{
	pax_msg * msg = pax_msg_new(synode, site);
	ref_msg(msg);
	copy_app_data(&msg->a, p->a);
	msg->start_type = p->start_type;
	set_learn_type(msg);
	MAY_DBG(FN; STRLIT("trying to learn known value "); SYCEXP(synode));
	send_to_all_site(site, msg, "learn_accepted_value");
	unref_msg(&msg);
}
/* purecov: end */

static int	is_busy(synode_no s)
{
	pax_machine * p = hash_get(s);
	if (!p) {
		return 0;
	} else {
		return started(p);
	}
}

#if 0
static synode_no find_slot(synode_no msgno, site_def **site)
{
	assert(!synode_eq(msgno, null_synode));
	while (is_busy(msgno)) {
		msgno = incr_msgno(msgno);
	}
	assert(!synode_eq(msgno, null_synode));
	*site = find_site_def_rw(msgno);
	return msgno;
}
#endif

bool_t match_my_msg(pax_msg *learned, pax_msg *mine)
{
	MAY_DBG(FN; PTREXP(learned->a);
	    if (learned->a)
	    SYCEXP(learned->a->unique_id);
	    PTREXP(mine->a);
	    if (mine->a)
	    SYCEXP(mine->a->unique_id);
	    )		;
	if (learned->a && mine->a) { /* Both have app data, see if data is mine */
		return synode_eq(learned->a->unique_id, mine->a->unique_id);
	} else if (!(learned->a || mine->a)) { /* None have app data, anything goes */
		return TRUE;
	} else { /* Definitely mismatch */
		return FALSE;
	}
}


#if TASK_DBUG_ON
static void	dbg_reply_set(site_def const * site, const char *s, bit_set *bs)
{
	unsigned int	i = 0;
	unsigned int	n = get_maxnodes(site);
	GET_GOUT;
	STRLIT(s);
	for (i = 0; i <  n && i < bs->bits.bits_len * sizeof(*bs->bits.bits_val) * BITS_PER_BYTE; i++) {
		NPUT(BIT_ISSET(i, bs), d);
	}
	PRINT_GOUT;
	FREE_GOUT;
}
#endif


static void	propose_noop(synode_no find, pax_machine *p);

static inline int too_far(synode_no s)
{
	return s.msgno >= executed_msg.msgno + event_horizon;
}

#define GOTO(x) {DBGOUT(STRLIT("goto "); STRLIT(#x)); goto x; }

static inline int is_view(cargo_type x)
{
	return x == view_msg;
}

static inline int is_config(cargo_type x)
{
	return x == unified_boot_type ||
	    x == add_node_type ||
	    x == remove_node_type ||
	    x == force_config_type;
}

static void terminate_and_exit();

/* Send messages by fetching from the input queue and trying to get it accepted
   by a Paxos instance */
static int	proposer_task(task_arg arg)
{
	DECL_ENV
	    int	self;       /* ID of this proposer task */
	pax_machine * p; /* Pointer to Paxos instance */
	msg_link * client_msg; /* The client message we are trying to push */
	synode_no msgno;
	pax_msg * prepare_msg;
	double	start_propose;
	double	start_push;
	double	delay;
	site_def const *site;
	size_t size;
	END_ENV;

	TASK_BEGIN

	  ep->self = get_int_arg(arg);
	ep->p = NULL;
	ep->client_msg = NULL;
	ep->prepare_msg = NULL;
	ep->start_propose = 0.0;
	ep->start_push = 0.0;
	ep->delay = 0.0;
	ep->msgno = current_message;
	ep->site = 0;
	ep->size = 0;

	MAY_DBG(FN; NDBG(ep->self, d); NDBG(task_now(), f));

	while (!xcom_shutdown) { /* Loop until no more work to do */
		int	MY_ATTRIBUTE((unused)) lock = 0;
		/* Wait for client message */
		assert(!ep->client_msg);
		CHANNEL_GET(&prop_input_queue, &ep->client_msg, msg_link);
		MAY_DBG(FN; PTREXP(ep->client_msg->p->a); STRLIT("extracted "); SYCEXP(ep->client_msg->p->a->app_key));

		/* Grab rest of messages in queue as well, but never batch config messages, which need a unique number */

		if(!is_config(ep->client_msg->p->a->body.c_t) && !is_view(ep->client_msg->p->a->body.c_t)){
			ep->size = app_data_size(ep->client_msg->p->a);
		    while(AUTOBATCH && ep->size <= MAX_BATCH_SIZE &&
		    	! link_empty(&prop_input_queue.data)){  /* Batch payloads into single message */
		    	msg_link *tmp;
		    	app_data_ptr atmp;

				CHANNEL_GET(&prop_input_queue, &tmp, msg_link);
				atmp = tmp->p->a;
				ep->size += app_data_size(atmp);
				/* Abort batching if config or too big batch */
				if(is_config(atmp->body.c_t) || is_view(atmp->body.c_t) ||
				ep->size > MAX_BATCH_SIZE){
					channel_put_front(&prop_input_queue, &tmp->l);
					break;
				}
		    	ADD_T_EV(seconds(),__FILE__, __LINE__, "batching");
	
				tmp->p->a = 0; 		/* Steal this payload */
				msg_link_delete(&tmp);  /* Get rid of the empty message */
				atmp->next = ep->client_msg->p->a; /* Add to list of app_data */
				G_TRACE("Batching %s %s", cargo_type_to_str(ep->client_msg->p->a->body.c_t),
					cargo_type_to_str(atmp->body.c_t));
				ep->client_msg->p->a = atmp;
				MAY_DBG(FN; PTREXP(ep->client_msg->p->a); STRLIT("extracted "); SYCEXP(ep->client_msg->p->a->app_key));
		    }
	    }
		ep->start_propose = task_now();
		ep->delay = 0.0;


		assert(!(AUTOBATCH && ep->client_msg->p->a->chosen));

		/* See if value is known already (old message) */
		if (ep->client_msg->p->a->chosen) {
			DBGOUT(FN; PTREXP(ep->client_msg->p->a); STRLIT("pushing old "); SYCEXP(ep->client_msg->p->a->app_key));
			MAY_DBG(FN;
			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
			    );
			ep->msgno = ep->client_msg->p->a->app_key;
			ep->site = find_site_def(ep->msgno);
			if(!ep->site) /* Use current site if message is too old */
				ep->site = get_site_def();

			/* See if we can do anything with this message */
#if 0
	    	if (!ep->site || get_nodeno(ep->site) == VOID_NODE_NO) {
				/* Give up */
				deliver_to_app(NULL, ep->client_msg->p->a, delivery_failure);
				GOTO(next);
			}
#endif
retry_old:
			ep->p = get_cache(ep->msgno);
			assert(ep->p);
			lock = lock_pax_machine(ep->p);
			assert(!lock);

			/* Try to get a value accepted */
			learn_accepted_value(ep->site, ep->client_msg->p, ep->msgno);
			while (!finished(ep->p)) {
				/* Sleep here if value is not already chosen */
				TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
				if (!synode_eq(ep->msgno, ep->p->synode)) {
					DBGOUT(FN; STRLIT("proposer_task detected stolen state machine, retry"); );
					/* unlock_pax_machine(ep->p); */
					GOTO(retry_old);
				}
				assert(synode_eq(ep->msgno, ep->p->synode));
				learn_accepted_value(ep->site, ep->client_msg->p, ep->msgno);
			}
			unlock_pax_machine(ep->p);
			msg_link_delete(&ep->client_msg);
			continue;
		}

		/* It is a new message */

		assert(!synode_eq(current_message, null_synode));

retry_new:
		/* Find a free slot */

		assert(!synode_eq(current_message, null_synode));
		ep->msgno = current_message;
		while (is_busy(ep->msgno)) {
			while (/* ! ep->client_msg->p->force_delivery &&  */ too_far(incr_msgno(ep->msgno))) { /* Too far ahead of executor */
				TIMED_TASK_WAIT(&exec_wait, 1.0);
				DBGOUT(FN; TIMECEXP(ep->start_propose); TIMECEXP(ep->client_msg->p->a->expiry_time); TIMECEXP(task_now());

				    NDBG(enough_live_nodes(ep->site), d));
			}
			ep->msgno = incr_msgno(ep->msgno);
		}
		assert(!synode_eq(ep->msgno, null_synode));
		proposer_site = find_site_def_rw(ep->msgno);

		ep->site = proposer_site;

		/* See if we can do anything with this message */
#if 1
    	if (!ep->site || get_nodeno(ep->site) == VOID_NODE_NO) {
			/* Give up */
			deliver_to_app(NULL, ep->client_msg->p->a, delivery_failure);
			GOTO(next);
		}
#endif
		DBGOHK(FN; STRLIT("changing current message"));
		set_current_message(ep->msgno);

		brand_client_msg(ep->client_msg->p, ep->msgno);
		ep->client_msg->p->a->lsn = ep->msgno.msgno;

		for(;;) { /* Loop until the client message has been learned */
			/* Get a Paxos instance to send the client message */
			ep->p = get_cache(ep->msgno);
			assert(ep->p);
			if(ep->client_msg->p->force_delivery)
				ep->p->force_delivery = ep->client_msg->p->force_delivery;
			lock = lock_pax_machine(ep->p);
			assert(!lock);

			/* Set the client message as current proposal */
			assert(ep->client_msg->p);
			replace_pax_msg(&ep->p->proposer.msg, clone_pax_msg(ep->client_msg->p));
      if (ep->p->proposer.msg == NULL) {
        g_critical("Node %u has run out of memory while sending a message and "
                   "will now exit.",
                   get_nodeno(proposer_site));
        terminate_and_exit();
        TERMINATE;
      }
			assert(ep->p->proposer.msg);
			PAX_MSG_SANITY_CHECK(ep->p->proposer.msg);

			/* Create the prepare message */
			unchecked_replace_pax_msg(&ep->prepare_msg,
				pax_msg_new(ep->msgno, ep->site));
			DBGOUT(FN; PTREXP(ep->client_msg->p->a); STRLIT("pushing "); SYCEXP(ep->msgno));
			MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(ep->prepare_msg->a)));

			if(threephase || ep->p->force_delivery){
				push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
			}else{
				push_msg_2p(ep->site, ep->p);
			}

			ep->start_push = task_now();

			while (!finished(ep->p)) { /* Try to get a value accepted */
				/* We will wake up periodically, and whenever a message arrives */
				TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
				if (!synode_eq(ep->msgno, ep->p->synode) || ep->p->proposer.msg == NULL) {
					DBGOHK(FN; STRLIT("detected stolen state machine, retry"); );
					/* unlock_pax_machine(ep->p); */
					GOTO(retry_new); /* Need to break out of both loops,
										and we have no "exit named loop" construction */
				}
				assert(synode_eq(ep->msgno, ep->p->synode) &&  ep->p->proposer.msg);
				if (finished(ep->p))
					break;
				 {
					 double	now = task_now();
					if ((ep->start_push + ep->delay) <= now) {
						PAX_MSG_SANITY_CHECK(ep->p->proposer.msg);
						DBGOUT(FN; STRLIT("retry pushing "); SYCEXP(ep->msgno));
						MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(ep->prepare_msg->a));
						    );
						DBGOUT(BALCEXP(ep->p->proposer.bal);
						    BALCEXP(ep->p->acceptor.promise));
						MAY_DBG(FN; dbg_reply_set(ep->site, "prep_node_set", ep->p->proposer.prep_nodeset);
						    dbg_reply_set(ep->site, "prop_node_set", ep->p->proposer.prop_nodeset);
						    );
						push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
						ep->start_push = now;
					}
				}
			}
			/* When we get here, we know the value for this message number,
			   but it may not be the value we tried to push,
			   so loop until we have a successfull push. */
			unlock_pax_machine(ep->p);
			MAY_DBG(FN; STRLIT(" found finished message ");
			    SYCEXP(ep->msgno);
			    STRLIT("seconds since last push ");
			    NPUT(task_now() - ep->start_push, f);
			    STRLIT("ep->client_msg ");
			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
			    );
			MAY_DBG(FN; STRLIT("ep->p->learner.msg ");
			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p->learner.msg));
			    );
			if (match_my_msg(ep->p->learner.msg, ep->client_msg->p)){
				break;
			} else
				GOTO(retry_new);
		}
next:
		 {
			double	now = task_now();
			double	used = now -ep->start_propose;
			add_to_filter(used);
			DBGOUT(FN; STRLIT("completed ep->msgno ");
			    SYCEXP(ep->msgno); NDBG(used, f); NDBG(median_time(), f);
			    STRLIT("seconds since last push "); NDBG(now - ep->start_push, f); );
			MAY_DBG(FN; STRLIT("ep->client_msg ");
			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
			    );
			if (ep->p) {
				MAY_DBG(FN; STRLIT("ep->p->learner.msg ");
						COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p->learner.msg));
				    );
			}
			msg_link_delete(&ep->client_msg);
		}
	}
	FINALLY
	    MAY_DBG(FN; STRLIT("exit "); NDBG(ep->self, d); NDBG(task_now(), f));
	if (ep->p)
		unlock_pax_machine(ep->p);
	replace_pax_msg(&ep->prepare_msg, NULL);
	if (ep->client_msg) { /* If we get here with a client message, we have failed to deliver */
		deliver_to_app(ep->p, ep->client_msg->p->a, delivery_failure);
		msg_link_delete(&ep->client_msg);

	}
	TASK_END;
}


/* }}} */


/* {{{ Executor task */

static node_no	leader(site_def const *s)
{
	node_no	leader = 0;
	for (leader = 0; leader < get_maxnodes(s); leader++) {
		if (!may_be_dead(s->detected, leader, task_now()))
			return leader;
	}
	return 0;
}


int	iamthegreatest(site_def const *s)
{
	return leader(s) == s->nodeno;
}


void	execute_msg(site_def const * site, pax_machine *pma, pax_msg *p)
{
	app_data_ptr a = p->a;
	DBGOUT(FN;
	    COPY_AND_FREE_GOUT(dbg_pax_msg(p));
	    );
	if (a) {
		switch (p->a->body.c_t) {
		case unified_boot_type:
	    case add_node_type:
	    case remove_node_type:
      case force_config_type:

			check_tasks();
			break;
		case xcom_recover:
/* purecov: begin deadcode */
			break;
/* purecov: end */
		case app_type:
			MAY_DBG(FN; STRLIT(" learner.msg ");
			    COPY_AND_FREE_GOUT(dbg_pax_msg(pma->learner.msg));
			    );
			deliver_to_app(pma, a, delivery_ok);
			break;
		case view_msg:
			MAY_DBG(FN;
			    STRLIT(" learner.msg ");
			    COPY_AND_FREE_GOUT(dbg_pax_msg(pma->learner.msg)); );
			if(site && site->global_node_set.node_set_len == a->body.app_u_u.present.node_set_len){
 				assert(site->global_node_set.node_set_len == a->body.app_u_u.present.node_set_len);
				copy_node_set(&a->body.app_u_u.present, &(((site_def *)site)->global_node_set));
				deliver_global_view_msg(site, p->synode);
			}
			break;
#ifdef USE_EXIT_TYPE
		case exit_type:
			g_critical("Unable to get message, process will now exit. Please ensure that the process is restarted");
			exit(1);
			break;
#endif
		default:
			break;
		}
	}
	MAY_DBG(FN; SYCEXP(p->synode));
}



static void	read_missing_values(int n);
static void	propose_missing_values(int n);

static void	find_value(site_def const *site, unsigned int *wait, int n)
{
	DBGOHK(FN; NDBG(*wait, d));

	if(get_nodeno(site) == VOID_NODE_NO){
		read_missing_values(n);
		return;
	}

	switch (*wait) {
	case 0:
	case 1:
		read_missing_values(n);
		(*wait)++;
		break;
	case 2:
		if (iamthegreatest(site))
			propose_missing_values(n);
		else
			read_missing_values(n);
		(*wait)++;
		break;
	case 3:
		propose_missing_values(n);
		break;
	default:
		break;
	}
}

int	get_xcom_message(pax_machine **p, synode_no msgno, int n)
{
	DECL_ENV
	    unsigned int	wait;
	double	delay;
	END_ENV;

	TASK_BEGIN

	    ep->wait = 0;
	ep->delay = 0.0;
	*p = get_cache(msgno);

	while (!finished(*p)) {
		site_def const * site = find_site_def(msgno);
		DBGOHK(FN;
		    STRLIT(" not finished ");
		    SYCEXP(msgno); PTREXP(*p);
		    NDBG(ep->wait, u);
		    SYCEXP(msgno));
		if (get_maxnodes(site) > 1 && iamthegreatest(site) &&
		    site->global_node_set.node_set_val &&
		    !site->global_node_set.node_set_val[msgno.node] &&
		    may_be_dead(site->detected, msgno.node, task_now())){
			propose_missing_values(n);
		} else {
			find_value(site, &ep->wait, n);
		}
		TIMED_TASK_WAIT(&(*p)->rv, ep->delay = wakeup_delay(ep->delay));
		*p = get_cache(msgno);
	}

	FINALLY
	    DBGOHK(FN; SYCEXP(msgno); PTREXP(*p); NDBG(ep->wait, u); SYCEXP(msgno));
	TASK_END;
}

synode_no set_executed_msg(synode_no msgno)
{
	DBGOUT(FN; STRLIT("changing executed_msg from "); SYCEXP(executed_msg); STRLIT(" to "); SYCEXP(msgno));
	if (synode_gt(msgno, current_message)) {
		DBGOHK(FN; STRLIT("changing current message"));
		set_current_message(first_free_synode(msgno));
	}

	if (msgno.msgno > executed_msg.msgno)
		task_wakeup(&exec_wait);

	executed_msg = msgno;
	executor_site = find_site_def(executed_msg);
	return executed_msg;
}


static synode_no first_free_synode(synode_no msgno)
{
	site_def const * site = find_site_def(msgno);
 	synode_no retval = msgno;
	if(get_group_id(site) == 0){
		DBGOUT(FN; PTREXP(site); SYCEXP(msgno));
		if(site){
            DBGOUT(FN; SYCEXP(site->boot_key); SYCEXP(site->start); COPY_AND_FREE_GOUT(dbg_site_def(site)));
		}
	}
	assert(get_group_id(site) != 0);
	assert(!synode_eq(msgno, null_synode));
	if (retval.msgno == 0)
		retval.msgno = 1;
	retval.node = get_nodeno(site);
	if (synode_lt(retval, msgno))
		return incr_msgno(retval);
	else
		return retval;
}



synode_no set_current_message(synode_no msgno)
{
	MAY_DBG(FN; STRLIT("changing current_message from "); SYCEXP(current_message); STRLIT(" to "); SYCEXP(msgno));
	return current_message = msgno;
}


static void	handle_learn(site_def const * site, pax_machine *p, pax_msg *m);
static void	update_max_synode(pax_msg *p);

#if TASK_DBUG_ON
static void perf_dbg(int *_n, int *_old_n, double *_old_t) MY_ATTRIBUTE((unused));
static void perf_dbg(int *_n, int *_old_n, double *_old_t)
{
	int	n = *_n;
	int	old_n = *_old_n;
	double	old_t = *_old_t;

	DBGOHK(FN; SYCEXP(executed_msg));
	if (!(n % 5000)) {
		GET_GOUT;
		NDBG(get_nodeno(get_site_def()), u);
		NDBG(task_now(), f);
		NDBG(n, d);
		NDBG(median_time(), f);
		SYCEXP(executed_msg);
		PRINT_GOUT;
		FREE_GOUT;
	}
	(*_n)++;
	if (task_now() - old_t > 1.0) {
		GET_GOUT;
		NDBG(get_nodeno(get_site_def()), u);
		NDBG(task_now(), f);
		NDBG(n, d);
		NDBG((n - old_n) / (task_now() - old_t), f);
		PRINT_GOUT;
		FREE_GOUT;
		*_old_t = task_now();
		*_old_n = n;
	}
}
#endif

#ifdef IGNORE_LOSERS

static inline int	LOSER(synode_no x, site_def const *site)
{
/*  node_no i = 0;
  node_no n = 0;
  node_no maxnodes = get_maxnodes(site);

  if (maxnodes == 0)
    return 0;

  for (i = 0; i < maxnodes; i++) {
    if (site->global_node_set.node_set_val[i]) {
      n++;
    }
  }
  DBGOUT(NEXP(maxnodes,u); NEXP(n,u)); */
  DBGOUT(NEXP(x.node,u); NEXP(site->global_node_set.node_set_val[(x).node],d));
  return
    /* ( n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes && 0 == get_nodeno(site)))) && */
    (!(site)->global_node_set.node_set_val[(x).node] );
}

#else
#define LOSER(x, site) 0
#endif

static void	debug_loser(synode_no x)  MY_ATTRIBUTE((unused));
#if defined(TASK_DBUG_ON) && TASK_DBUG_ON
static void	debug_loser(synode_no x)
{
	if (1 || x.msgno < 10) {
		GET_GOUT;
		NDBG(get_nodeno(find_site_def(x)), u);
		STRLIT(" ignoring loser ");
		SYCEXP(x);
		SYCEXP(max_synode);
		PRINT_GOUT;
		FREE_GOUT;
	}
}
#else
/* purecov: begin deadcode */
static void	debug_loser(synode_no x MY_ATTRIBUTE((unused)))
{
}
/* purecov: end */
#endif

/* #define DBGFIX2(x){ GET_GOUT; ADD_F_GOUT("%f ",task_now()); x; PRINT_GOUT; FREE_GOUT; } */
#define DBGFIX2(x)
static void	send_value(site_def const *site, node_no to, synode_no synode)
{
	pax_machine * pm = get_cache(synode);
	if (pm && pm->learner.msg) {
		pax_msg * msg = clone_pax_msg(pm->learner.msg);
		if (msg == NULL) return;
		ref_msg(msg);
		send_server_msg(site, to, msg);
		unref_msg(&msg);
	}
}

/* Peturn message number where it is safe for nodes in prev config to exit */
static synode_no compute_delay(synode_no start)
{
	start.msgno += event_horizon;
	return start;
}

/* Push messages to all nodes which were in the previous site, but not in this */
static void inform_removed(int index,int all)
{
	site_def * *sites = 0;
	uint32_t	site_count = 0;
	DBGFIX2(FN; NEXP(index, d));
	get_all_site_defs(&sites, &site_count);
	while (site_count > 1 && index >= 0 && (uint32_t)(index + 1) < site_count) {
		site_def * s = sites[index];
		site_def * ps = sites[index+1];

		/* Compute diff and push messages */
		DBGFIX2(FN; NDBG(index,d); PTREXP(s); if(s)SYCEXP(s->boot_key); PTREXP(ps); if(ps)SYCEXP(ps->boot_key));

		if (s && ps) {
			node_no i = 0;
			DBGFIX2(FN; SYCEXP(s->boot_key); SYCEXP(s->start);
			    SYCEXP(ps->boot_key); SYCEXP(ps->start));
			for (i = 0; i < ps->nodes.node_list_len; i++) { /* Loop over prev site */
				if (ps->nodeno != i && !node_exists(&ps->nodes.node_list_val[i], &s->nodes)) {
					synode_no synode = s->start;
					synode_no end = compute_delay(s->start);
					while (!synode_gt(synode, end)) { /* Loop over relevant messages */
						send_value(ps, i, synode);
						synode = incr_synode(synode);
					}
				}
			}
		}
		if(! all) /* Early exit if not all configs should be examined */
			break;
		index--;
	}
}

site_def *handle_add_node(app_data_ptr a)
{
	site_def * site = clone_site_def(get_site_def());
	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
	MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
	ADD_EVENTS(
	    add_event(string_arg("a->app_key"));
	    add_synode_event(a->app_key);
	    );
	assert(get_site_def());
	assert(site);
	add_site_def(a->body.app_u_u.nodes.node_list_len,
	    a->body.app_u_u.nodes.node_list_val, site);
	site->start = getstart(a);
	site->boot_key = a->app_key;
	site_install_action(site, a->body.c_t);
	return site;
}

static void	terminate_and_exit()
{
	XCOM_FSM(xa_terminate, int_arg(0));	/* Tell xcom to stop */
	XCOM_FSM(xa_exit, int_arg(0));		/* Tell xcom to exit */
	if (xcom_expel_cb) xcom_expel_cb(0);
}

int	terminator_task(task_arg arg)
{
	DECL_ENV
	    double t;
	END_ENV;

	TASK_BEGIN

	    ep->t = get_double_arg(arg);
		TASK_DELAY(ep->t);
		terminate_and_exit();
	FINALLY
	TASK_END;
}

static void	delayed_terminate_and_exit(double t)
{
	task_new(terminator_task, double_arg(t), "terminator_task", XCOM_THREAD_DEBUG);
}

static inline int is_empty_site(site_def const *s)
{
	return s->nodes.node_list_len == 0;
}

site_def *handle_remove_node(app_data_ptr a)
{
	site_def * site = clone_site_def(get_site_def());
	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)));
	ADD_EVENTS(
	    add_event(string_arg("a->app_key"));
	    add_synode_event(a->app_key);
	    add_event(string_arg("nodeno"));
	    add_event(uint_arg(get_nodeno(site)));
	);

	remove_site_def(a->body.app_u_u.nodes.node_list_len,
	    a->body.app_u_u.nodes.node_list_val, site);
	site->start = getstart(a);
	site->boot_key = a->app_key;
	site_install_action(site, a->body.c_t);
	return site;
}

void handle_config(app_data_ptr a)
{
	while(a){
		switch (a->body.c_t) {
			case unified_boot_type:
				install_node_group(a);
				break;
			case add_node_type:
				handle_add_node(a);
				break;
			case remove_node_type:
				handle_remove_node(a);
                if(xcom_shutdown)
					return;
				break;
			case force_config_type:
				install_node_group(a);
				break;
			default:
				break;
		}
		a = a->next;
	}
}

enum exec_state {
	FETCH = 0,
	EXECUTE = 1
};
typedef enum exec_state exec_state;

#define NEXTSTATE(x) ep->state = (x)

static 	synode_no delivered_msg;

synode_no get_delivered_msg()
{
	return delivered_msg;
}

static inline int is_member(site_def const *site)
{
	return site->nodeno != VOID_NODE_NO;
}

/*
Execute xcom message stream.

Beware of the exit logic in this task, which is both simple and not so simple.
Consider three configs C1 and C2. C1 has two nodes, A and B. C2 has only node B.
C3 is empty.
A config with message number N will be activated after a delay of (at least)
alpha messages, where alpha is the size of the pipeline (or the event horizon).

So, C1.start = C1+alpha,
and C2.start = C2+alpha. A, which is removed from C1, cannot exit until a majority
of nodes in the new config C2 (in this case B) has learned all the messages from
config C1, which means all messages less than C2.start. How can A know that a majority
of C2 has learned those messages?

If we denote the first message that is not yet decided (and executed) by E,
the proposers will not try to propose messages with number >= E+alpha,
and all incoming tcp messages with message number >= E+alpha will be ignored.
E is incremented by the executor task, so all messages < E are known.
This means that when the value of E+alpha is known, all messages up to
and including E are also known, although not all messages E+1..E+alpha-1
necessarily are known.

This leads to the requirement that a node which is removed (A) needs to wait until
it knows the value of C2.start+alpha, since by then it knows that a majority
of the nodes in C2 are ready to execute C2.start, which in turn implies that
a majority of nodes in C2 knows all the values from config C1. Note that the last
message that should be delivered to the application by a node that is leaving C1 is
C2.start-1, which is the last message of C1.

How does a node that is removed get to know values from the next config?
There are two ways, and we use both. First, the node that tries to exit can
simply ask for the message. get_xcom_message() will do this for all messages
<= max_synode, but it may take some time.
Second, the nodes of C2 can send the messages C2.start..C2.start+alpha
to the nodes that are removed (nodes that are in C1 but not in C2).
inform_removed() does this. We take care to handle the case where configs are close enough
that C0 < C1 <= C0+alpha by tracking the oldest config that contains nodes that are
leaving.

This takes care of nodes leaving C1. What about nodes that leave C2? C3 is empty,
so B, which is leaving C2, cannot wait for messages from C3. But since C3 is empty,
there is no need to wait. It can exit immediately after having executed C3.start-1, the
last message of C2. What if C3.start-1 < C2.start+alpha? This can happen if C2 and C3
are close. In that case, B will exit before A gets the chance to learn C2.start+alpha,
which will leave A hanging forever. Clearly, we need to impose an additional constraint,
that C3.start must be greater than C2.start+alpha. This is taken care of by the special
test for an empty config.

Complicated and confusing? Not really, but there is a clean and simple solution which has
not been implemented yet, since it requires more changes to the consensus logic.
If we require that for the messages C2..C2.start-1 we have a majority from both the nodes
in C1 and the nodes in C2, the nodes not in C2 can exit when they have executed message
C2.start-1, since we then know that a majority of the nodes of C2 has agreed on those messages
as well, so they do not depend on the nodes not in C2 any more. This holds even if C2 is empty.
Note that requiring a majority from both C1 and C2 is different from requiring a majority from
C1+C2, which means that the proposer logic needs to consider answers from two different sets of
acceptors for those messages. Since acceptors are identified by their node number, and the node
numbers need not be the same for both configs, we need to maintain a mapping between the nodes
numbers of any two consecutive configs. Alternatively, we could remove the node numbers altogether,
and always use a unique, unchanging ID for a node, like IP address + port.

*/

/* FIFO which tracks the message numbers where we should deliver queued messages or
inform the removed nodes */
#define FIFO_SIZE 1000
static struct {
	int n;
	int front;
	int rear;
	synode_no q[FIFO_SIZE];
}delay_fifo;

static inline int addone(int i)
{
	return ((i + 1) % FIFO_SIZE);
}

/* Is queue empty?  */
static inline int fifo_empty()
{
	return delay_fifo.n <= 0;
}

/* Is queue full?  */
static inline int fifo_full()
{
	return delay_fifo.n >= FIFO_SIZE;
}


/* Insert in queue  */
static inline void fifo_insert(synode_no s)
{
	if(! fifo_full()){
		delay_fifo.n++;
		delay_fifo.q[delay_fifo.rear] = s;
		delay_fifo.rear = addone(delay_fifo.rear);
	}
}

/* Extract first from queue  */
static inline synode_no fifo_extract()
{
	if(! fifo_empty()){
		synode_no ret = delay_fifo.q[delay_fifo.front];
		delay_fifo.front = addone(delay_fifo.front);
		delay_fifo.n--;
		return ret;
	}else{
		return null_synode;
	}
}

/* Return first in queue, but do not dequeue  */
static inline synode_no fifo_front()
{
	if(! fifo_empty()){
		return delay_fifo.q[delay_fifo.front];
	}else{
		return null_synode;
	}
}

static int	executor_task(task_arg arg MY_ATTRIBUTE((unused)))
{
	DECL_ENV
	    pax_machine * p;
	int	n ;
	int	old_n;
	double	old_t;
	synode_no exit_synode;
	exec_state state;
	enum {
		no_exit,
		not_member_exit,
		empty_exit
	} exit_type;
	int inform_index;
	END_ENV;

	TASK_BEGIN
	    ep->p = NULL;
	ep->n = 0;
	ep->old_n = 0;
	ep->old_t = task_now();
	ep->exit_synode = null_synode;
	ep->exit_type = no_exit;
	ep->inform_index = -1;
	delay_fifo.n = 0;
	delay_fifo.front = 0;
	delay_fifo.rear = 0;

	set_last_received_config(null_synode);

	if (executed_msg.msgno == 0)
		executed_msg.msgno = 1;
	delivered_msg = executed_msg;
	NEXTSTATE(FETCH);
	executor_site = find_site_def(executed_msg);

	while (!xcom_shutdown) {
		for (; ; ) {
			if (ep->state == FETCH) {
				if ( !LOSER(executed_msg, executor_site)) {
					TASK_CALL(get_xcom_message(&ep->p, executed_msg, FIND_MAX));
					DBGOUT(FN; STRLIT("got message "); SYCEXP(ep->p->synode); COPY_AND_FREE_GOUT(dbg_app_data(XAPP)));
					/* Execute unified_boot immediately, but do not deliver site message until we */
					/* are ready to execute messages from the new site definition. */
					/* At that point we can be certain that a majority have learned */
					/* everything from the old site. */

					if ((XAPP) && is_config((XAPP)->body.c_t) &&
						synode_gt(executed_msg, get_site_def()->boot_key)) /* Redo test */
					{
						site_def * site = 0;
						set_last_received_config(executed_msg);
						handle_config(XAPP);
						garbage_collect_site_defs(delivered_msg);
						check_tasks();
						site = get_site_def_rw();
						if (site == 0) {
							TERMINATE;
						}
						DBGFIX2(FN; STRLIT("new config "); SYCEXP(site->boot_key); ); /*SYCEXP(site->start); NEXP(get_nodeno(site),d); NEXP(ARBITRATOR_HACK,d);
								NEXP(ep->exit_type,d); SYCEXP(ep->exit_synode);
								SYCEXP(executed_msg); SYCEXP(max_synode)); */

						/* If site is empty, increase start to allow nodes to terminate before start */
						if (is_empty_site(site)) {
							site->start = compute_delay(compute_delay(site->start));
						}
						if (ep->exit_type == no_exit){/* We have not yet set the exit trigger */
							synode_no delay_until;
							if(is_member(site)){
								delay_until = compute_delay(site->start);
							} else { /* Not in this site */
								/*
									See if site will be empty when we leave.
									If the new site is empty, we should exit after having
									delivered the last message from the old site.
								*/
								if (is_empty_site(site)) {
									ep->exit_synode = decr_synode(site->start);
									ep->exit_type = empty_exit;
									delay_until = ep->exit_synode;
									DBGFIX2(FN; SYCEXP(ep->exit_synode); SYCEXP(executed_msg); SYCEXP(max_synode));
								}else{
									/*
										If we are not a member of the new site, we should exit after having
										seen enough messages from the new site.
									 */
									ep->exit_synode = compute_delay(site->start);
									ep->exit_type = not_member_exit;
									if (!synode_lt(ep->exit_synode, max_synode)){
										/* We need messages from the next site, so set max_synode accordingly. */
										set_max_synode(incr_synode(ep->exit_synode));
									}
									delay_until = ep->exit_synode;
									DBGFIX2(FN; SYCEXP(delay_until); SYCEXP(executed_msg); SYCEXP(max_synode));
									DBGFIX2(FN; SYCEXP(ep->exit_synode); SYCEXP(executed_msg); SYCEXP(max_synode));
								}
							}
	
							if (synode_gt(delay_until, max_synode))
								set_max_synode(delay_until);
							fifo_insert(delay_until);
							ep->inform_index++;
						}
					} else {
						DBGOUT(FN; SYCEXP(executed_msg); SYCEXP(get_site_def()->boot_key));
					}
				} else {
					DBGOUT(FN; debug_loser(executed_msg); PTREXP(executor_site);
						COPY_AND_FREE_GOUT(dbg_node_set(executor_site->global_node_set)));
				}
				DBGOUT(FN; NDBG(ep->state, d); SYCEXP(delivered_msg); SYCEXP(executed_msg);
					SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));

				/* See if we should exit when having seen this message */
				if (ep->exit_type == not_member_exit && synode_eq(executed_msg, ep->exit_synode)) {
					inform_removed(ep->inform_index, 1); /* Inform all removed nodes before we exit */
					delayed_terminate_and_exit(TERMINATE_DELAY);	/* Tell xcom to stop */
					TERMINATE;
				}

				if (fifo_empty()) {
					NEXTSTATE(EXECUTE);
				} else if (synode_eq(executed_msg, fifo_front())) {
					DBGFIX2(FN; SYCEXP(fifo_front()); SYCEXP(executed_msg);
							SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
					while(synode_eq(executed_msg, fifo_front())){ /* More than one may match */
						inform_removed(ep->inform_index, 0);
						fifo_extract();
						ep->inform_index--;
					}
					garbage_collect_servers();
					NEXTSTATE(EXECUTE);
				}
				SET_EXECUTED_MSG(incr_synode(executed_msg));
				MAY_DBG(FN; NDBG(ep->state, d); SYCEXP(fifo_front()); SYCEXP(executed_msg));
				MAY_DBG(FN; NDBG(ep->state, d); SYCEXP(ep->exit_synode); SYCEXP(executed_msg));
			} else if (ep->state == EXECUTE) {
				site_def const * x_site = find_site_def(delivered_msg);

				DBGOUT(FN; NDBG(ep->state, d); SYCEXP(delivered_msg); SYCEXP(delivered_msg); SYCEXP(executed_msg);
					SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
				ep->p = get_cache(delivered_msg);
				ADD_EVENTS(
					add_event(string_arg("executing message"));
					add_synode_event(ep->p->synode);
				);
				if (LOSER(delivered_msg, x_site)) {
#ifdef IGNORE_LOSERS
					DBGOUT(FN; debug_loser(delivered_msg); PTREXP(x_site); dbg_node_set(x_site->global_node_set));
#endif
				} else if ((ep->p)->learner.msg->msg_type != no_op) {
					execute_msg(find_site_def(delivered_msg), ep->p, ep->p->learner.msg);
#if defined(TASK_DBUG_ON) && TASK_DBUG_ON
					DBGOUT(perf_dbg(&ep->n, &ep->old_n, &ep->old_t));
#endif
				}
				/* Garbage collect old servers */
				if (synode_eq(delivered_msg, x_site->start)) {
					garbage_collect_servers();
				}
				/* See if we should exit when having delivered this message */
				if (ep->exit_type == empty_exit && synode_eq(delivered_msg, ep->exit_synode)) {
					inform_removed(ep->inform_index, 1); /* Inform all removed nodes before we exit */
					delayed_terminate_and_exit(TERMINATE_DELAY);	/* Tell xcom to stop */
					TERMINATE;
				}
				delivered_msg = incr_synode(delivered_msg);
				if (synode_eq(delivered_msg, executed_msg)) {
					NEXTSTATE(FETCH);
				}
			} else {
				abort();
			}
		}
	}
	FINALLY
		DBGOUT(FN; STRLIT(" shutdown "); SYCEXP(executed_msg); NDBG(task_now(), f));
	TASK_END;
}


static synode_no  get_sweep_start()
{
	synode_no    find = executed_msg;
	find.node = get_nodeno(find_site_def(find));
	if (find.node < executed_msg.node) {
		find = incr_msgno(find);
	}
	return find;
}


static int	sweeper_task(task_arg arg MY_ATTRIBUTE((unused)))
{
	DECL_ENV
	    synode_no find;
	END_ENV;

	TASK_BEGIN

	    ep->find = get_sweep_start();

	while (!xcom_shutdown) {
		ep->find.group_id = executed_msg.group_id; /* In case group id has changed */
#ifndef AGGRESSIVE_SWEEP
		while (!is_only_task()) {
			TASK_YIELD;
		}
#endif
		ADD_EVENTS(
			add_event(string_arg("sweeper ready"));
			add_synode_event(executed_msg);
		);
/*		DBGOUT(FN; STRLIT("ready to run ");   */
/*			SYCEXP(executed_msg); SYCEXP(max_synode); SYCEXP(ep->find));  */
		 {
			while (synode_lt(ep->find, max_synode) && ! too_far(ep->find)) {
				/* pax_machine * pm = hash_get(ep->find); */
				pax_machine * pm = 0;
				ADD_EVENTS(
					add_event(string_arg("sweeper examining"));
					add_synode_event(ep->find);
				);
				DBGOUT(FN; STRLIT("examining "); SYCEXP(ep->find));
				if (ep->find.node == VOID_NODE_NO) {
					if(synode_gt(executed_msg, ep->find)){
						ep->find = get_sweep_start();
					}
					if (ep->find.node == VOID_NODE_NO)
						goto deactivate;
				}
				pm = get_cache(ep->find);
				if (pm && !pm->force_delivery) { /* We want full 3 phase Paxos for forced messages */
					/* DBGOUT(FN; dbg_pax_machine(pm)); */
					if (!is_busy_machine(pm) && pm->acceptor.promise.cnt == 0 && ! pm->acceptor.msg && !finished(pm)) {
						pm->op = skip_op;
						ADD_EVENTS(
							add_event(string_arg("sweeper skipping"));
							add_synode_event(ep->find);
							add_event(string_arg(pax_op_to_str(pm->op)));
						);
						skip_msg(pax_msg_new(ep->find, find_site_def(ep->find)));
						MAY_DBG(FN; STRLIT("skipping "); SYCEXP(ep->find));
/* 						MAY_DBG(FN; dbg_pax_machine(pm)); */
					}
				}
				ep->find = incr_msgno(ep->find);
			}
		}
	deactivate:
		TASK_DEACTIVATE;
	}
	FINALLY
	    MAY_DBG(FN; STRLIT(" shutdown sweeper "); SYCEXP(executed_msg); NDBG(task_now(), f));
	TASK_END;
}


/* }}} */

#if 0
static double	wakeup_delay(double old)
{
	double	retval = 0.0;
	if (0.0 == old) {
		double	m = median_time();
		if (m == 0.0 || m > 1.0)
			m = 0.1;
		retval = 0.1 + 10.0 * m + m * my_drand48();
	} else {
		retval = old * 1.4142136; /* Exponential backoff */
	}
	while (retval > 10.0)
		retval /= 1.31415926;
	/* DBGOUT(FN; NDBG(retval,d)); */
	return retval;
}


#else
static double	wakeup_delay(double old)
{
	double	retval = 0.0;
	if (0.0 == old) {
		double	m = median_time();
		if (m == 0.0 || m > 0.3)
			m = 0.1;
		retval = 0.1 + 5.0 * m + m * my_drand48();
	} else {
		retval = old * 1.4142136; /* Exponential backoff */
	}
	while (retval > 3.0)
		retval /= 1.31415926;
	/* DBGOUT(FN; NDBG(retval,d)); */
	return retval;
}


#endif

static void	propose_noop(synode_no find, pax_machine *p)
{
	/* Prepare to send a noop */
   	site_def const *site = find_site_def(find);
  pax_msg *clone = NULL;
	assert(! too_far(find));
	replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
	assert(p->proposer.msg);
	create_noop(p->proposer.msg);
/*  	DBGOUT(FN; SYCEXP(find););  */

  clone = clone_pax_msg(p->proposer.msg);
  if (clone != NULL) {
    push_msg_3p(site, p, clone, find, no_op);
  } else {
    G_DEBUG("Unable to propose NoOp due to an OOM error.");
  }
}


static void	send_read(synode_no find)
{
	/* Prepare to send a read_op */
	site_def const *site = find_site_def(find);
 	MAY_DBG(FN; NDBG(get_maxnodes(site),u); NDBG(get_nodeno(site),u););
	if (site && find.node != get_nodeno(site)) {
		pax_msg * pm = pax_msg_new(find, site);
		ref_msg(pm);
		create_read(site, pm);
  		MAY_DBG(FN; SYCEXP(find););

 		MAY_DBG(FN; NDBG(get_maxnodes(site),u); NDBG(get_nodeno(site),u); PTREXP(pm));
		/* send_server_msg(site, find.node, pm); */
#if 0
		send_to_others(site, pm, "send_read");
#else
		if(get_nodeno(site) == VOID_NODE_NO)
			send_to_others(site, pm, "send_read");
		else
			send_to_someone(site, pm, "send_read");
#endif
		unref_msg(&pm);
	}
}


/* }}} */

/* {{{ Find missing values */

static int	ok_to_propose(pax_machine *p)
{
#if 0
	site_def const *s = find_site_def(p->synode.group_id);
	int	retval = (p->synode.node == get_nodeno(s) || task_now() -p->last_modified > DETECTOR_LIVE_TIMEOUT || may_be_dead(s->detected, p->synode.node, task_now()))
	 && !recently_active(p) && !finished(p) && !is_busy_machine(p);
#else
	int	retval = !recently_active(p) && !finished(p) && !is_busy_machine(p);
#endif
	MAY_DBG(FN; NDBG(p->synode.node, u); NDBG(recently_active(p),d); NDBG(finished(p),d); NDBG(is_busy_machine(p),d); NDBG(retval, d));
	return retval;
}


static void	read_missing_values(int n)
{
	synode_no find = executed_msg;
	synode_no end = max_synode;
	int	i = 0;

	MAY_DBG(FN; SYCEXP(find); SYCEXP(end));
	if (synode_gt(executed_msg, max_synode) ||
	    synode_eq(executed_msg, null_synode))
		return;

	while (!synode_gt(find, end) && i < n && ! too_far(find)) {
		pax_machine * p = get_cache(find);
		ADD_EVENTS(
			add_synode_event(find);
			add_synode_event(end);
			add_event(string_arg("active "));
			add_event(int_arg(recently_active(p)));
			add_event(string_arg("finished  "));
			add_event(int_arg(finished(p)));
			add_event(string_arg("busy "));
			add_event(int_arg(is_busy_machine(p)));
		);
		MAY_DBG(FN; SYCEXP(find); SYCEXP(end); NDBG(recently_active(p), d); NDBG(finished(p), d); NDBG(is_busy_machine(p), d));


		if (!recently_active(p) && !finished(p) && !is_busy_machine(p)) {
			send_read(find);
		}
		find = incr_synode(find);
		i++;
	}
}


static void	propose_missing_values(int n)
{
	synode_no find = executed_msg;
	synode_no end = max_synode;
	int	i = 0;

	DBGOHK(FN; NDBG(get_maxnodes(get_site_def()), u); SYCEXP(find); SYCEXP(end));
	if (
	    synode_gt(executed_msg, max_synode) ||
	    synode_eq(executed_msg, null_synode))
		return;

	MAY_DBG(FN; SYCEXP(find); SYCEXP(end));
	i = 0;
	while (!synode_gt(find, end) && i < n && ! too_far(find)) {
		pax_machine * p = get_cache(find);
 		 DBGOHK(FN; NDBG(ok_to_propose(p),d);  TIMECEXP(task_now());  TIMECEXP(p->last_modified); SYCEXP(find))
 		    ;
		if(get_nodeno(find_site_def(find)) == VOID_NODE_NO)
			break;
		if (ok_to_propose(p)) {
			if (task_now() - BUILD_TIMEOUT > p->last_modified){
				propose_noop(find, p);
			}
		}
		find = incr_synode(find);
		i++;
	}
}


/* Propose a noop for the range find..end */
void	request_values(synode_no find, synode_no end)
{
	DBGOUT(FN; SYCEXP(find); SYCEXP(find); SYCEXP(end); );
	while (!synode_gt(find, end) && ! too_far(find)) {
		pax_machine * p = get_cache(find);
		site_def const *site = find_site_def(find);
		if(get_nodeno(site) == VOID_NODE_NO)
			break;
		if (!finished(p) && !is_busy_machine(p)) {
			/* Prepare to send a noop */
			replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
			assert(p->proposer.msg);
			create_noop(p->proposer.msg);

			DBGOUT(FN; STRLIT("propose"); SYCEXP(find); );
			push_msg_3p(site, p, pax_msg_new(find, site), find, no_op);
		}
		find = incr_synode(find);
	}
}


/* }}} */

/* {{{ Message handlers */
#if 0
void	reply_msg(site_def const * site, pax_msg *m)
{
	MAY_DBG(FN; );
	if (get_server(s, m->from)) {
		send_server_msg(s, m->from, m);
	}
}

#else
#define reply_msg(m) \
{ \
  if(is_local_node((m)->from, site)){ \
    dispatch_op(site, m, NULL); \
  }else{ \
    if(node_no_exists((m)->from, site) && (m)->group_id == get_group_id(site) && get_server(site, (m)->from)){ \
			send_server_msg(site, (m)->from, m); \
		}else{ \
			link_into(&(msg_link_new((m), (m)->from)->l), reply_queue); \
		} \
	} \
}
#endif


#define CREATE_REPLY(x) pax_msg *reply = NULL; CLONE_PAX_MSG(reply, x)
#define SEND_REPLY reply_msg(reply); replace_pax_msg(&reply, NULL)

bool_t safe_app_data_copy(pax_msg **target, app_data_ptr source) {
  copy_app_data(&(*target)->a, source);
  if ((*target)->a == NULL && source != NULL) {
    oom_abort = 1;
    replace_pax_msg(target, NULL);
    return FALSE;
  }
  return TRUE;
}

static void	teach_ignorant_node(site_def const * site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
{
	CREATE_REPLY(pm);
	DBGOUT(FN; SYCEXP(synode));
	reply->synode = synode;
	reply->proposal = p->learner.msg->proposal;
	reply->msg_type =  p->learner.msg->msg_type;
	safe_app_data_copy(&reply, p->learner.msg->a);
	if (reply != NULL) {
	  set_learn_type(reply);
	  /* set_unique_id(reply, p->learner.msg->unique_id); */
	  SEND_REPLY;
	}
}


/* Handle incoming read */
static void	handle_read(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
{
	DBGOUT(FN;
	    BALCEXP(pm->proposal);
	    BALCEXP(p->acceptor.promise);
	    if (p->acceptor.msg)
	    BALCEXP(p->acceptor.msg->proposal);
	    STRLIT("type "); STRLIT(pax_msg_type_to_str(pm->msg_type)))         ;

	if (finished(p)) { /* We have learned a value */
		teach_ignorant_node(site, p, pm, pm->synode, reply_queue);
	}
}


#ifdef USE_EXIT_TYPE
static void	miss_prepare(site_def const * site, pax_msg *pm, linkage *reply_queue)
{
	CREATE_REPLY(pm);
	DBGOUT(FN; SYCEXP(pm->synode));
	reply->msg_type = normal;
	reply->a = new_app_data();
	reply->a->body.c_t = exit_type;
	reply->op = ack_prepare_op;
	SEND_REPLY;
}


static void	miss_accept(site_def const * site, pax_msg *pm, linkage *reply_queue)
{
	CREATE_REPLY(pm);
	DBGOUT(FN; SYCEXP(pm->synode));
	ref_msg(reply);
	reply->msg_type = normal;
	reply->op = ack_accept_op;
	if (servers[pm->from]) {
		send_server_msg(site, pm->from, pm);
	}
	SEND_REPLY;
}


#endif

static void	handle_simple_prepare(site_def const * site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
{
	if (finished(p)) { /* We have learned a value */
		MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(finished(p), d));
		teach_ignorant_node(site, p, pm, synode, reply_queue);
	} else {
		int	greater = gt_ballot(pm->proposal, p->acceptor.promise); /* Paxos acceptor phase 1 decision */
		MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(greater, d));
		if (greater || noop_match(p, pm) ) {
			CREATE_REPLY(pm);
			reply->synode = synode;
			if (greater)
				p->acceptor.promise = pm->proposal; /* promise to not accept any less */
			if (accepted(p)) { /* We have accepted a value */
				reply->proposal = p->acceptor.msg->proposal;
				reply->msg_type =  p->acceptor.msg->msg_type;
				MAY_DBG(FN; STRLIT(" already accepted value "); SYCEXP(synode));
				reply->op = ack_prepare_op;
				safe_app_data_copy(&reply, p->acceptor.msg->a);
				if (reply == NULL) return;  // Failed to allocate memory for the copy.
			} else {
				MAY_DBG(FN; STRLIT(" no value synode "); SYCEXP(synode));
				reply->op = ack_prepare_empty_op;
			}
			SEND_REPLY;
		}
	}
}


/* Handle incoming prepare */
static void	handle_prepare(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
{
		ADD_EVENTS(
			add_synode_event(p->synode);
			add_event(string_arg("pm->from"));
			add_event(int_arg(pm->from));
			add_event(string_arg(pax_op_to_str(pm->op)));
		);
#if 0
	DBGOUT(FN;
	    NDBG(pm->from, d); NDBG(pm->to, d);
	    SYCEXP(pm->synode);
	    BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
#endif
	MAY_DBG(FN; BALCEXP(pm->proposal);
	    BALCEXP(p->acceptor.promise);
	    if (p->acceptor.msg)
	    BALCEXP(p->acceptor.msg->proposal);
	    STRLIT("type "); STRLIT(pax_msg_type_to_str(pm->msg_type)))         ;

	handle_simple_prepare(site, p, pm, pm->synode, reply_queue);
}


static void	check_propose(site_def const * site, pax_machine *p)
{
	MAY_DBG(FN; SYCEXP(p->synode);
	    COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(site)));
	    );
	PAX_MSG_SANITY_CHECK(p->proposer.msg);
	if (prep_majority(site, p)) {
		p->proposer.msg->proposal = p->proposer.bal;
		BIT_ZERO(p->proposer.prop_nodeset);
		p->proposer.msg->synode = p->synode;
		propose_msg(p->proposer.msg);
		p->proposer.sent_prop = p->proposer.bal;
	}
}


static void	check_learn(site_def const * site, pax_machine *p)
{
	MAY_DBG(FN; SYCEXP(p->synode);
	    COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(site)));
	    );
	PAX_MSG_SANITY_CHECK(p->proposer.msg);
	if (get_nodeno(site) != VOID_NODE_NO && prop_majority(site, p)) {
		p->proposer.msg->synode = p->synode;
		if (p->proposer.msg->receivers)
			free_bit_set(p->proposer.msg->receivers);
		p->proposer.msg->receivers = clone_bit_set(p->proposer.prep_nodeset);
		BIT_SET(get_nodeno(site), p->proposer.msg->receivers);
		if(no_duplicate_payload)
			tiny_learn_msg(site, p->proposer.msg);
 		else
			learn_msg(site, p->proposer.msg);
		p->proposer.sent_learn = p->proposer.bal;
	}
}


static void	do_learn(site_def const * site MY_ATTRIBUTE((unused)), pax_machine *p, pax_msg *m)
{
	ADD_EVENTS(
		add_synode_event(p->synode);
		add_event(string_arg("m->from"));
		add_event(int_arg(m->from));
		add_event(string_arg(pax_op_to_str(m->op)));
	);
	/* FN; SYCEXP(p->synode); SYCEXP(m->synode); STRLIT(NEWLINE); */
	MAY_DBG(FN; SYCEXP(p->synode); SYCEXP(m->synode);
	    dbg_bitset(m->receivers, get_maxnodes(site));
	    );
	if (m->a)
		m->a->chosen = TRUE;
	replace_pax_msg(&p->acceptor.msg, m);
	replace_pax_msg(&p->learner.msg, m);
	/*
	   Track memory used by client data in the cache.
	   If we do not care about instances that are being decided,
	   it is only necessary to compute the added memory when we
	   record the outcome of a consensus round.
	*/
	add_cache_size(pax_machine_size(p));
	/* Shrink the cache size if necessary */
	shrink_cache();
}


static void	handle_simple_ack_prepare(site_def const * site MY_ATTRIBUTE((unused)), pax_machine *p, pax_msg *m)
{
	if(get_nodeno(site) != VOID_NODE_NO)
		BIT_SET(m->from, p->proposer.prep_nodeset);
}


/* Other node has already accepted a value */
static void	handle_ack_prepare(site_def const * site, pax_machine *p, pax_msg *m)
{
	ADD_EVENTS(
		add_synode_event(p->synode);
		add_event(string_arg("m->from"));
		add_event(int_arg(m->from));
		add_event(string_arg(pax_op_to_str(m->op)));
	);
#if 0
	DBGOUT(FN;
	    NDBG(pm->from, d); NDBG(pm->to, d);
	    SYCEXP(pm->synode);
	    BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
#endif
	assert(m);
	MAY_DBG(FN;
	    if (p->proposer.msg)
	    BALCEXP(p->proposer.msg->proposal);
	    BALCEXP(p->proposer.bal);
	    BALCEXP(m->reply_to);
	    BALCEXP(p->proposer.sent_prop);
	    SYCEXP(m->synode))		;
	if (m->from != VOID_NODE_NO && eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
		handle_simple_ack_prepare(site, p, m);
		if (gt_ballot(m->proposal, p->proposer.msg->proposal)) { /* greater */
			replace_pax_msg(&p->proposer.msg, m);
			assert(p->proposer.msg);
		}
		if (gt_ballot(m->reply_to, p->proposer.sent_prop))
			check_propose(site, p);
	}
}


/* Other node has not already accepted a value */
static void	handle_ack_prepare_empty(site_def const * site, pax_machine *p, pax_msg *m)
{
	ADD_EVENTS(
		add_synode_event(p->synode);
		add_event(string_arg("m->from"));
		add_event(int_arg(m->from));
		add_event(string_arg(pax_op_to_str(m->op)));
	);
#if 0
	DBGOUT(FN;
	    NDBG(pm->from, d); NDBG(pm->to, d);
	    SYCEXP(pm->synode);
	    BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
#endif
	MAY_DBG(FN;
	    if (p->proposer.msg)
	    BALCEXP(p->proposer.msg->proposal);
	    BALCEXP(p->proposer.bal);
	    BALCEXP(m->reply_to);
	    BALCEXP(p->proposer.sent_prop);
	    SYCEXP(m->synode))		;
	if (m->from != VOID_NODE_NO && eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
		handle_simple_ack_prepare(site, p, m);
		if (gt_ballot(m->reply_to, p->proposer.sent_prop))
			check_propose(site, p);
	}
}


/* #define AUTO_MSG(p,synode) {if(!(p)){replace_pax_msg(&(p), pax_msg_new(synode, site));} */

static void	handle_simple_accept(site_def const * site, pax_machine *p, pax_msg *m, synode_no synode, linkage *reply_queue)
{
	if (finished(p)) { /* We have learned a value */
		teach_ignorant_node(site, p, m, synode, reply_queue);
	} else if (!gt_ballot(p->acceptor.promise, m->proposal) || /* Paxos acceptor phase 2 decision */
	noop_match(p, m) ) {
		MAY_DBG(FN; SYCEXP(m->synode); STRLIT("accept "); BALCEXP(m->proposal));
		replace_pax_msg(&p->acceptor.msg, m);
		 {
			CREATE_REPLY(m);
			reply->op = ack_accept_op;
			reply->synode = synode;
			SEND_REPLY;
		}
	}
}


/* Accecpt value if promise is not greater */
static void	handle_accept(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *m)
{
	MAY_DBG(FN;
	    BALCEXP(p->acceptor.promise);
	    BALCEXP(m->proposal);
	    STREXP(pax_msg_type_to_str(m->msg_type)));
	PAX_MSG_SANITY_CHECK(m);
	ADD_EVENTS(
		add_synode_event(p->synode);
		add_event(string_arg("m->from"));
		add_event(int_arg(m->from));
		add_event(string_arg(pax_op_to_str(m->op)));
	);

	handle_simple_accept(site, p, m, m->synode, reply_queue);
}


/* Handle answer to accept */
static void	handle_ack_accept(site_def const * site, pax_machine *p, pax_msg *m)
{
	ADD_EVENTS(
		add_synode_event(p->synode);
		add_event(string_arg("m->from"));
		add_event(int_arg(m->from));
		add_event(string_arg(pax_op_to_str(m->op)));
	);
	MAY_DBG(FN; SYCEXP(m->synode); BALCEXP(p->proposer.bal); BALCEXP(p->proposer.sent_learn); BALCEXP(m->proposal); BALCEXP(m->reply_to);
	    );
	MAY_DBG(FN; SYCEXP(p->synode);
	    if (p->acceptor.msg)
	    BALCEXP(p->acceptor.msg->proposal);
	    BALCEXP(p->proposer.bal);
	    BALCEXP(m->reply_to);
	    )		;
	if (get_nodeno(site) != VOID_NODE_NO && m->from != VOID_NODE_NO &&
      eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my accept */
		BIT_SET(m->from, p->proposer.prop_nodeset);
		if (gt_ballot(m->proposal, p->proposer.sent_learn))
			check_learn(site, p);
	}
}

/* Configure all messages in interval start, end to be forced */
static void force_interval(synode_no start, synode_no end)
{
	while (synode_lt(start, end)) {
		pax_machine * p = get_cache(start);
		if(get_nodeno(find_site_def(start)) == VOID_NODE_NO)
			break;
		/* if(! finished(p)) */
		p->force_delivery = 1;
		/* Old nodesets are null and void */
		BIT_ZERO(p->proposer.prep_nodeset);
		BIT_ZERO(p->proposer.prep_nodeset);
		start = incr_synode(start);
	}
}

static void start_force_config(site_def *s)
{
	synode_no end = s->boot_key;

	synode_set_to_event_horizon(&end);

	DBGOUT(FN; SYCEXP(executed_msg); SYCEXP(end));
	if (synode_gt(end, max_synode))
		set_max_synode(end);

	free_site_def(forced_config);
	forced_config = s;
	force_interval(executed_msg, max_synode); /* Force everything in the pipeline */
}


/* Learn this value */
static void	handle_learn(site_def const * site, pax_machine *p, pax_msg *m)
{
	MAY_DBG(FN; STRLIT("proposer nodeset ");
	    dbg_bitset(p->proposer.prop_nodeset, get_maxnodes(site));
	    );
	MAY_DBG(FN; STRLIT("receivers ");
	    dbg_bitset(m->receivers, get_maxnodes(site));
	    );
	MAY_DBG(FN; NDBG(task_now(), f); SYCEXP(p->synode);
	    COPY_AND_FREE_GOUT(dbg_app_data(m->a));
	    );

	PAX_MSG_SANITY_CHECK(m);

	if (!finished(p)) { /* Avoid re-learn */
		do_learn(site, p, m);
		/* Check for special messages */
		if(m->a && m->a->body.c_t == unified_boot_type){
			DBGOUT(FN; STRLIT("Got unified_boot "); SYCEXP(p->synode); SYCEXP(m->synode); );
			XCOM_FSM(xa_net_boot, void_arg(m->a));
		}
		/* See if someone is forcing a new config */
		if(m->force_delivery && m->a){
			DBGOUT(FN; STRLIT("Got forced config "); SYCEXP(p->synode); SYCEXP(m->synode); );
			/* Configure all messages from executed_msg until start of new config
			   as forced messages so they will eventually be finished */
			/* Immediately install this new config */
			switch (m->a->body.c_t) {
			case add_node_type:
/* purecov: begin deadcode */
				start_force_config(clone_site_def(handle_add_node(m->a)));
				break;
/* purecov: end */
			case remove_node_type:
/* purecov: begin deadcode */
				start_force_config(clone_site_def(handle_remove_node(m->a)));
				break;
/* purecov: end */
			case force_config_type:
				start_force_config(clone_site_def(install_node_group(m->a)));
				break;
			default:
				break;
			}
			force_interval(executed_msg, getstart(m->a));
		}
	}

	task_wakeup(&p->rv);
}


/* Skip this value */
static void	handle_skip(site_def const * site, pax_machine *p, pax_msg *m)
{
	/*   MAY_DBG(FN;); */
	/*   MAY_DBG(FN; NDBG(task_now(),f); SYCEXP(p->msg->synode)); */
	if (!finished(p)) {
		skip_value(m);
		do_learn(site, p, m);
	}
	/*   MAY_DBG(FN; STRLIT("taskwakeup "); SYCEXP(p->msg->synode)); */
	task_wakeup(&p->rv);
}


static void	handle_client_msg(pax_msg *p)
{
	if (!p || p->a == NULL)
		/* discard invalid message */
		return;

	{
		msg_link * ml = msg_link_new(p, VOID_NODE_NO);

		/* Put it in the proposer queue */
		ADD_T_EV(task_now(), __FILE__, __LINE__, "handle_client_msg");
		channel_put(&prop_input_queue, &ml->l);
	}
}

/* Handle incoming alive message */
static double	sent_alive = 0.0;
static inline void	handle_alive(site_def const * site, linkage *reply_queue, pax_msg *pm)
{
	int not_to_oneself = (pm->from != get_nodeno(site) && pm->from != pm->to);
	DBGOUT(FN; SYCEXP(pm->synode); NDBG(pm->from,u); NDBG(pm->to,u); );

	/*
	  This code will check if the ping is intended to us.
	  If the encoded node does not exist in the current configuration,
	  we avoid sending need_boot_op, since it must be from a different
	  reincarnation of this node.
	*/
	if(site && pm->a && pm->a->body.c_t == xcom_boot_type)
	{
		DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&pm->a->body.app_u_u.nodes)););
		not_to_oneself &=
			node_exists_with_uid(&pm->a->body.app_u_u.nodes.node_list_val[0], &get_site_def()->nodes);
	}


	if (!client_boot_done && /* Already done? */
	    not_to_oneself && /* Not to oneself */
	    !is_dead_site(pm->group_id)) { /* Avoid dealing with zombies */
		double	t = task_now();
		if (t - sent_alive > 1.0) {
			CREATE_REPLY(pm);
			reply->op = need_boot_op;
			SEND_REPLY;
			sent_alive = t;
			DBGOUT(FN; STRLIT("sent need_boot_op"); );
		}
	}
}


static void	update_max_synode(pax_msg *p)
{
#if 0
	/* if (group_id == 0 || synode_eq(max_synode, null_synode) ||  */
	/*     synode_gt(p->max_synode, max_synode)) { */
	/*   set_max_synode(p->max_synode); */
	/* } */
	if (group_id == 0 || synode_eq(max_synode, null_synode) ||
	    (p->msg_type == normal &&
	    (max_synode.group_id == 0 || p->synode.group_id == 0 ||
	    max_synode.group_id == p->synode.group_id) &&
	    synode_gt(p->synode, max_synode))) {
		set_max_synode(p->synode);
	}
#else
	if (is_dead_site(p->group_id))
		return;
	if (get_group_id(get_site_def()) == 0 || max_synode.group_id == 0) {
		set_max_synode(p->synode);
	} else if (max_synode.group_id == p->synode.group_id) {
		if (synode_gt(p->synode, max_synode)) {
			set_max_synode(p->synode);
		}
		if (synode_gt(p->max_synode, max_synode)) {
			set_max_synode(p->max_synode);
		}
	}
#endif
}


/* Add app_data to message cache */
/* purecov: begin deadcode */
void	add_to_cache(app_data_ptr a, synode_no synode)
{
  pax_machine * pm = get_cache(synode);
	pax_msg * msg = pax_msg_new_0(synode);
	ref_msg(msg);
	assert(pm);
	safe_app_data_copy(&msg, a);
	if (msg != NULL) {
	  set_learn_type(msg);
	  /* msg->unique_id = a->unique_id; */
	  do_learn(0, pm, msg);
	  unref_msg(&msg);
	}
}
/* purecov: end */

/* }}} */

/* {{{ Message dispatch */
#define BAL_FMT "ballot {cnt %d node %d}"
#define BAL_MEM(x) (x).cnt, (x).node

static int clicnt = 0;

static u_int is_reincarnation_adding(app_data_ptr a)
{
	/* Get information on the current site definition */
	const site_def* new_site_def= get_site_def();
	const site_def* valid_site_def= find_site_def(executed_msg);

	/* Get information on the nodes to be added */
	u_int nodes_len  = a->body.app_u_u.nodes.node_list_len;
	node_address* nodes_to_change= a->body.app_u_u.nodes.node_list_val;

	u_int i = 0;
	for(; i < nodes_len; i++)
	{
		if (node_exists(&nodes_to_change[i], &new_site_def->nodes) ||
			node_exists(&nodes_to_change[i], &valid_site_def->nodes))
		{
			/*
			We are simply ignoring the attempt to add a node to the
			group when there is an old incarnation of it, meaning
			that the node has crashed and restarted so fastly that
			nobody has noticed that it has gone.

			In XCOM, the group is not automatically reconfigured
			and it is possible to start reusing a node that has
			crashed and restarted without reconfiguring the group
			by adding the node back to it.

			However, this operation may be unsafe because XCOM
			does not implement a crash-recovery model and nodes
			suffer from amnesia after restarting the service. In
			other words this may lead to inconsistency issues in
			the paxos protocol.

			Unfortunately, preventing that a node is added back
			to the system where there is an old incarnation will
			not fix this problem since other changes are required.
			*/
			G_MESSAGE("Old incarnation found while trying to add node %s %.*s.",
				  nodes_to_change[i].address,
				  nodes_to_change[i].uuid.data.data_len,
				  nodes_to_change[i].uuid.data.data_val
			);
			return 1;
		}
	}

	return 0;
}

static u_int is_reincarnation_removing(app_data_ptr a)
{
	/* Get information on the current site definition */
	const site_def* new_site_def= get_site_def();

	/* Get information on the nodes to be added */
	u_int nodes_len  = a->body.app_u_u.nodes.node_list_len;
	node_address* nodes_to_change= a->body.app_u_u.nodes.node_list_val;

	u_int i = 0;
	for(; i < nodes_len; i++)
	{
		if (!node_exists_with_uid(&nodes_to_change[i], &new_site_def->nodes))
		{
			/*
			We cannot allow an upper-layer to remove a new incarnation
			of a node, when it tries to remove an old one.
			*/
			G_MESSAGE("Old incarnation found while trying to "
				  "remove node %s %.*s.",
				  nodes_to_change[i].address,
				  nodes_to_change[i].uuid.data.data_len,
				  nodes_to_change[i].uuid.data.data_val
			);

			return 1;
		}
	}

	return 0;
}

/**
 * Logs the fact that an add/remove node request is aimed at another group.
 *
 * @param a a pointer to the app_data of the configuration command
 * @param message_fmt a formatted message to log, containing a single %s that will be replaced by the node's address
 */
static void log_cfgchange_wrong_group(app_data_ptr a, const char *const message_fmt)
{
	u_int const nr_nodes = a->body.app_u_u.nodes.node_list_len;
	u_int i;
	for (i = 0; i < nr_nodes; i++)
	{
		char const *const address = a->body.app_u_u.nodes.node_list_val[i].address;
		G_WARNING(message_fmt, address);
	}
}

/**
 * Validates if a configuration command can be executed.
 * Checks whether the configuration command is aimed at the correct group.
 * Checks whether the configuration command pertains to a node reincarnation.
 *
 * @param p a pointer to the pax_msg of the configuration command
 * @retval REQUEST_OK if the reconfiguration command can be executed
 * @retval REQUEST_RETRY if XCom is still booting
 * @retval REQUEST_FAIL if the configuration command cannot be executed
 */
static client_reply_code can_execute_cfgchange(pax_msg *p)
{
	app_data_ptr a = p->a;

	if (executed_msg.msgno <= 2)
		return REQUEST_RETRY;

	if (a && a->group_id != 0 && a->group_id != executed_msg.group_id)
	{
		switch (a->body.c_t) {
		case add_node_type:
			log_cfgchange_wrong_group(a, "The request to add %s to the group has been rejected because it is aimed at another group");
			break;
		case remove_node_type:
			log_cfgchange_wrong_group(a, "The request to remove %s from the group has been rejected because it is aimed at another group");
			break;
		case force_config_type:
			G_WARNING("The request to force the group membership has been rejected because it is aimed at another group");
			break;
		default:
			assert(0 && "A cargo_type different from {add_node_type, remove_node_type, force_config_type} should not have hit this code path");
		}
		return REQUEST_FAIL;
	}

        if (a && a->body.c_t == add_node_type && is_reincarnation_adding(a))
		return REQUEST_FAIL;

        if (a && a->body.c_t == remove_node_type && is_reincarnation_removing(a))
		return REQUEST_FAIL;

	return REQUEST_OK;
}

static void activate_sweeper()
{
	if (sweeper) {
		ADD_EVENTS(
			add_event(string_arg("sweeper activated max_synode"));
			add_synode_event(max_synode);
		);
		task_activate(sweeper);
	}
}

pax_msg *dispatch_op(site_def const *site, pax_msg *p, linkage *reply_queue)
{
	pax_machine * pm = NULL;
	site_def * dsite = find_site_def_rw(p->synode);
	int	in_front = too_far(p->synode);

	if(p->force_delivery){
		/* Ensure that forced message can be processed */
		in_front = 0;
	}

	if (dsite && p->op != client_msg){
		note_detected(dsite, p->from);
		update_delivered(dsite, p->from, p->delivered_msg);
	}

	MAY_DBG(FN; STRLIT("incoming message ");
			COPY_AND_FREE_GOUT(dbg_pax_msg(p));
	    );
	ADD_EVENTS(
		add_synode_event(p->synode);
		add_event(string_arg("p->from"));
		add_event(int_arg(p->from));
		add_event(string_arg(pax_op_to_str(p->op)));
	);
	switch (p->op) {
	case client_msg:
		clicnt++;
		if (p->a && (p->a->body.c_t == enable_arbitrator)) {
			CREATE_REPLY(p);
			DBGOUT(FN; STRLIT("Got enable_arbitrator from client"); SYCEXP(p->synode); );
			ARBITRATOR_HACK = 1;
			reply->op = xcom_client_reply;
			reply->cli_err = REQUEST_OK;
			SEND_REPLY;
			break;
		}
		if (p->a && (p->a->body.c_t == disable_arbitrator)) {
			CREATE_REPLY(p);
			DBGOUT(FN; STRLIT("Got disable_arbitrator from client"); SYCEXP(p->synode); );
			ARBITRATOR_HACK = 0;
			reply->op = xcom_client_reply;
			reply->cli_err = REQUEST_OK;
			SEND_REPLY;
			break;
		}
		if (p->a && (p->a->body.c_t == set_cache_limit)) {
			CREATE_REPLY(p);
			DBGOUT(FN; STRLIT("Got set_cache_limit from client"); SYCEXP(p->synode); );
			if(the_app_xcom_cfg){
				set_max_cache_size(p->a->body.app_u_u.cache_limit);
				reply->cli_err = REQUEST_OK;
			}else{
				reply->cli_err = REQUEST_FAIL;
			}
			reply->op = xcom_client_reply;
			SEND_REPLY;
			break;
		}
		if (p->a && (p->a->body.c_t == x_terminate_and_exit)) {
			CREATE_REPLY(p);
			DBGOUT(FN; STRLIT("Got terminate_and_exit from client"); SYCEXP(p->synode); );
			reply->op = xcom_client_reply;
			reply->cli_err = REQUEST_OK;
			SEND_REPLY;
			/*
			  The function frees sites which is used by SEND_REPLY,
			  so it should be called after SEND_REPLY.
			*/
			terminate_and_exit();
			break;
		}
		if (p->a && (p->a->body.c_t == add_node_type ||
					 p->a->body.c_t == remove_node_type ||
					 p->a->body.c_t == force_config_type)) {
			client_reply_code cli_err;
			CREATE_REPLY(p);
			reply->op = xcom_client_reply;
			reply->cli_err = cli_err = can_execute_cfgchange(p);
			SEND_REPLY;
			if (cli_err != REQUEST_OK) {
				break;
			}
		}
		if (p->a && p->a->body.c_t == unified_boot_type) {
			DBGOUT(FN; STRLIT("Got unified_boot from client"); SYCEXP(p->synode); );
			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
			XCOM_FSM(xa_net_boot, void_arg(p->a));
		}
		if (p->a && p->a->body.c_t == add_node_type) {
			DBGOUT(FN; STRLIT("Got add_node from client"); SYCEXP(p->synode); );
			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
			assert(get_site_def());
		}
		if (p->a && p->a->body.c_t == remove_node_type) {
			DBGOUT(FN; STRLIT("Got remove_node from client"); SYCEXP(p->synode); );
			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
			assert(get_site_def());
		}
		if (p->a && p->a->body.c_t == force_config_type) {
			DBGOUT(FN; STRLIT("Got new config from client"); SYCEXP(p->synode); );
			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
			assert(get_site_def());
			XCOM_FSM(xa_force_config, void_arg(p->a));
		}
		handle_client_msg(p);
		break;
	case initial_op:
		break;
	case read_op:
		pm = get_cache(p->synode);
		assert(pm);
		if(client_boot_done)
			handle_alive(site, reply_queue, p);
		handle_read(site, pm, reply_queue, p);
		break;
	case prepare_op:
		pm = get_cache(p->synode);
		assert(pm);
		if(p->force_delivery)
			pm->force_delivery = 1;
		pm->last_modified = task_now();
		if(client_boot_done)
			handle_alive(site, reply_queue, p);
		handle_prepare(site, pm, reply_queue, p);
		break;
	case ack_prepare_op:
		if (in_front || !is_cached(p->synode))
			break;
		pm = get_cache(p->synode);
		if(p->force_delivery)
			pm->force_delivery = 1;
		if (!pm->proposer.msg)
			break;
		assert(pm && pm->proposer.msg);
		handle_ack_prepare(site, pm, p);
		break;
	case ack_prepare_empty_op:
		if (in_front || !is_cached(p->synode))
			break;
		pm = get_cache(p->synode);
		if(p->force_delivery)
			pm->force_delivery = 1;
		if (!pm->proposer.msg)
			break;
		assert(pm && pm->proposer.msg);
		handle_ack_prepare_empty(site, pm, p);
		break;
	case accept_op:
		pm = get_cache(p->synode);
		assert(pm);
		if(p->force_delivery)
			pm->force_delivery = 1;
		pm->last_modified = task_now();
		handle_alive(site, reply_queue, p);
		handle_accept(site, pm, reply_queue, p);
		break;
	case ack_accept_op:
		if (in_front || !is_cached(p->synode))
			break;
		pm = get_cache(p->synode);
		if(p->force_delivery)
			pm->force_delivery = 1;
		if (!pm->proposer.msg)
			break;
		assert(pm && pm->proposer.msg);
		handle_ack_accept(site, pm, p);
		break;
	case recover_learn_op:
		DBGOUT(FN; STRLIT("recover_learn_op receive "); SYCEXP(p->synode));
		pm = get_cache(p->synode);
		assert(pm);
		if(p->force_delivery)
			pm->force_delivery = 1;
		pm->last_modified = task_now();
		update_max_synode(p);
		{
			DBGOUT(FN; STRLIT("recover_learn_op learn "); SYCEXP(p->synode));
			p->op = learn_op;
			handle_learn(site, pm, p);
		}
		break;
	case learn_op:
learnop:
		pm = get_cache(p->synode);
		assert(pm);
		if(p->force_delivery)
			pm->force_delivery = 1;
		pm->last_modified = task_now();
		update_max_synode(p);
		activate_sweeper();
		handle_learn(site, pm, p);
		break;
	case tiny_learn_op:
		if (p->msg_type == no_op)
			goto learnop;
		pm = get_cache(p->synode);
		assert(pm);
		if(p->force_delivery)
			pm->force_delivery = 1;
		if (pm->acceptor.msg) {
			/* 			BALCEXP(pm->acceptor.msg->proposal); */
			if (eq_ballot(pm->acceptor.msg->proposal, p->proposal)) {
				pm->acceptor.msg->op = learn_op;
				pm->last_modified = task_now();
				update_max_synode(p);
				activate_sweeper();
				handle_learn(site, pm, pm->acceptor.msg);
			} else {
				send_read(p->synode);
				DBGOUT(FN; STRLIT("tiny_learn"); SYCEXP(p->synode);
				    BALCEXP(pm->acceptor.msg->proposal); BALCEXP(p->proposal));
			}
		} else {
			send_read(p->synode);
			DBGOUT(FN; STRLIT("tiny_learn"); SYCEXP(p->synode);
			    BALCEXP(p->proposal));
		}
		break;
	case skip_op:
		pm = get_cache(p->synode);
		assert(pm);
		if(p->force_delivery)
			pm->force_delivery = 1;
		pm->last_modified = task_now();
		handle_skip(site, pm, p);
		break;
	case i_am_alive_op:
		handle_alive(site, reply_queue, p);
		break;
	case are_you_alive_op:
		handle_alive(site, reply_queue, p);
		break;
	case need_boot_op:
/* purecov: begin deadcode */
		XCOM_FSM(xa_need_snapshot, void_arg(p));
		break;
/* purecov: end */
	case snapshot_op:
		if (!is_dead_site(p->group_id)) {
			update_max_synode(p);
		}
		break;
	case gcs_snapshot_op:
		if (!is_dead_site(p->group_id)) {
			update_max_synode(p);
			XCOM_FSM(xa_snapshot, void_arg(p));
			XCOM_FSM(xa_complete,int_arg(0));
		}
		break;
	case die_op:
		/* assert("die horribly" == "need coredump"); */
		 {
			GET_GOUT;
			FN;
			STRLIT("die_op ");
			SYCEXP(executed_msg);
			SYCEXP(delivered_msg);
			SYCEXP(p->synode);
			SYCEXP(p->delivered_msg);
			SYCEXP(p->max_synode);
			PRINT_GOUT;
			FREE_GOUT;
		}
		/*
		If the message with the number in  the  incoming  die_op  message
		already  has  been  executed  (delivered),  then it means that we
		actually got consensus on it, since otherwise we would  not  have
		delivered it.Such a situation could arise if one of the nodes has
		expelled the message from its cache, but others have not. So when
		sending  out  a  request, we might get two different answers, one
		indicating that we are too far behind  and  should  restart,  and
		another  with  the  actual  consensus value. If the value arrives
		first, we will deliver it, and then the die_op may arrive  later.
		But  it this case it does not matter, since we got what we needed
		anyway. It is only a partial guard against exiting without really
		needing  it  of course, since the die_op may arrive first, and we
		do not wait for a die_op from all the other nodes.  We  could  do
		that  with  some extra housekeeping in the pax_machine (a new bit
		vector), but I am not convinced that it is worth the effort.
		*/
		if(!synode_lt(p->synode, executed_msg)){
			g_critical("Node %u unable to get messages, since the "
				   "group is too far ahead. Node will now exit.",
				get_nodeno(site));
			terminate_and_exit();
		}
	default:
		break;
	}
	if (oom_abort) {
	  g_critical("Node %u has run out of memory and will now exit.",
	             get_nodeno(site));
	  terminate_and_exit();
	}
	return(p);
}



/* }}} */

/* {{{ Acceptor-learner task */
int	acceptor_learner_task(task_arg arg)
{
	DECL_ENV
	    connection_descriptor rfd;
	srv_buf *in_buf;

	pax_msg * p;
	u_int	buflen;
	char	*buf;
	linkage reply_queue;
	int	errors;
	server *srv;
	END_ENV;

	TASK_BEGIN

		ep->in_buf = calloc(1, sizeof(srv_buf));

	ep->rfd.fd = get_int_arg(arg);
#ifdef XCOM_HAVE_OPENSSL
	ep->rfd.ssl_fd = 0;
#endif
	ep->p = NULL;
	ep->buflen = 0;
	ep->buf = NULL;
	ep->errors = 0;
	ep->srv = 0;

	/* We have a connection, make socket non-blocking and wait for request */
	unblock_fd(ep->rfd.fd);
	set_nodelay(ep->rfd.fd);
	wait_io(stack, ep->rfd.fd, 'r');
	TASK_YIELD;

#ifdef XCOM_HAVE_OPENSSL
	if (xcom_use_ssl()) {
		ep->rfd.ssl_fd = SSL_new(server_ctx);
		SSL_set_fd(ep->rfd.ssl_fd, ep->rfd.fd);

		{
			int	ret_ssl;
			int	err;
			ERR_clear_error();
			ret_ssl = SSL_accept(ep->rfd.ssl_fd);
			err = SSL_get_error(ep->rfd.ssl_fd, ret_ssl);

			while (ret_ssl != SSL_SUCCESS) {
				if (err == SSL_ERROR_WANT_READ){
					wait_io(stack, ep->rfd.fd, 'r');
				} else if (err == SSL_ERROR_WANT_WRITE){
					wait_io(stack, ep->rfd.fd, 'w');
				} else { /* Some other error, give up */
					break;
				}
				TASK_YIELD;
				SET_OS_ERR(0);
				G_DEBUG("acceptor learner accept retry fd %d", ep->rfd.fd);
				ERR_clear_error();
				ret_ssl = SSL_accept(ep->rfd.ssl_fd);
				err = SSL_get_error(ep->rfd.ssl_fd, ret_ssl);
			}

			if (ret_ssl != SSL_SUCCESS) {
				ssl_free_con(&ep->rfd);
				close_connection(&ep->rfd);
				TERMINATE;
			}
		}

	} else {
		ep->rfd.ssl_fd = 0;
	}
#endif
	set_connected(&ep->rfd, CON_FD);
	link_init(&ep->reply_queue,  type_hash("msg_link"));

	while (!xcom_shutdown) {
		int64_t	n;
		site_def const * site = 0;
		unchecked_replace_pax_msg(&ep->p, pax_msg_new_0(null_synode));

		if(use_buffered_read){
			TASK_CALL(buffered_read_msg(&ep->rfd, ep->in_buf, ep->p, ep->srv, &n));
		}else{
			TASK_CALL(read_msg(&ep->rfd, ep->p, ep->srv, &n));
		}
		if (((int)ep->p->op < (int)client_msg || ep->p->op > LAST_OP)) {
			/* invalid operation, ignore message */
			delete_pax_msg(ep->p);
			ep->p = NULL;
			TASK_YIELD;
			continue;
		}
		if (n <= 0) {
			break;
		}
		site = find_site_def(ep->p->synode);
		/*
			Getting a pointer to the server needs to be done after we have
			received a message, since without having received a message, we
			cannot know who it is from. We could peek at the message and de‐
			serialize the message number and from field, but since the server
			does not change, it should be sufficient to cache the server in
			the acceptor_learner task. A cleaner solution would have been to
			move the timestamps out of the server object, and have a map in‐
			dexed by IP/port or UUID to track the timestamps, since this is
			common to both the sender_task, reply_handler_task,  and the ac‐
			ceptor_learner_task.
		*/
		// Allow the previous server reference to be freed.
		if (ep->srv) srv_unref(ep->srv);
		ep->srv = get_server(site, ep->p->from);
		// Prevent the new server reference from being freed.
		if (ep->srv) srv_ref(ep->srv);
		ep->p->refcnt = 1; /* Refcnt from other end is void here */
		MAY_DBG(FN;
				NDBG(ep->rfd.fd, d); NDBG(task_now(), f);
				COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p));
				);
		receive_count[ep->p->op]++;
		receive_bytes[ep->p->op] += (uint64_t)n + MSG_HDR_SIZE;
		{
			gboolean behind = FALSE;
			if (get_maxnodes(site) > 0) {
				behind = ep->p->synode.msgno < delivered_msg.msgno;
			}
			ADD_EVENTS(
				add_event(string_arg("before dispatch "));
				add_synode_event(ep->p->synode);
				add_event(string_arg("ep->p->from"));
				add_event(int_arg(ep->p->from));
				add_event(string_arg(pax_op_to_str(ep->p->op)));
			);
			if (ep->p->msg_type == normal ||
			    ep->p->synode.msgno == 0 || /* Used by i-am-alive and so on */
				is_cached(ep->p->synode) || /* Already in cache */
				(!behind)) { /* Guard against cache pollution from other nodes */
				dispatch_op(site, ep->p, &ep->reply_queue);

				/* Send replies on same fd */
				while (!link_empty(&ep->reply_queue)) {
					msg_link * reply = (msg_link * )(link_extract_first(&ep->reply_queue));
					MAY_DBG(FN;
					    COPY_AND_FREE_GOUT(dbg_linkage(&ep->reply_queue));
					    COPY_AND_FREE_GOUT(dbg_msg_link(reply));
					    COPY_AND_FREE_GOUT(dbg_pax_msg(reply->p));
					    );
					assert(reply->p);
					assert(reply->p->refcnt > 0);
					reply->p->to = ep->p->from;
					reply->p->from = ep->p->to;
					reply->p->delivered_msg = get_delivered_msg();
					reply->p->max_synode = get_max_synode();
					serialize_msg(reply->p, ep->rfd.x_proto, &ep->buflen, &ep->buf);
					MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_msg_link(reply));
							COPY_AND_FREE_GOUT(dbg_pax_msg(reply->p)));
					msg_link_delete(&reply);
					if(ep->buflen){
						int64_t	sent;
						TASK_CALL(task_write(&ep->rfd , ep->buf, ep->buflen, &sent));
						send_count[ep->p->op]++;
						send_bytes[ep->p->op] += ep->buflen;
						X_FREE(ep->buf);
					}
					ep->buf = NULL;
				}
			} else {
				DBGOUT(FN; STRLIT("rejecting ");
					   STRLIT(pax_op_to_str(ep->p->op));
					   NDBG(ep->p->from, d); NDBG(ep->p->to, d);
					   SYCEXP(ep->p->synode);
					   BALCEXP(ep->p->proposal));
				if (xcom_booted() && behind) {
#ifdef USE_EXIT_TYPE
					if (ep->p->op == prepare_op) {
						miss_prepare(ep->p, &ep->reply_queue);
					} else if (ep->p->op == accept_op) {
						miss_accept(ep->p, &ep->reply_queue);
					}
#else
					if (/*ep->p->op == prepare_op && */ was_removed_from_cache(ep->p->synode)) {
						DBGOUT(FN; STRLIT("send_die ");
							   STRLIT(pax_op_to_str(ep->p->op));
							   NDBG(ep->p->from, d); NDBG(ep->p->to, d);
							   SYCEXP(ep->p->synode);
							   BALCEXP(ep->p->proposal));
						if (get_maxnodes(site) > 0) {
							pax_msg * np = NULL;
							np = pax_msg_new(ep->p->synode, site);
							ref_msg(np);
							np->op = die_op;
							np->to = ep->p->from;
							np->from = ep->p->to;
							np->delivered_msg = get_delivered_msg();
							np->max_synode = get_max_synode();
							DBGOUT(FN; STRLIT("sending die_op to node "); NDBG(np->to, d);
								SYCEXP(executed_msg); SYCEXP(max_synode); SYCEXP(np->synode));
							serialize_msg(np, ep->rfd.x_proto, &ep->buflen, &ep->buf);
							if(ep->buflen){
								int64_t	sent;
								TASK_CALL(task_write(&ep->rfd , ep->buf, ep->buflen, &sent));
								send_count[ep->p->op]++;
								send_bytes[ep->p->op] += ep->buflen;
								X_FREE(ep->buf);
							}
							ep->buf = NULL;
							unref_msg(&np);
						}

					}
#endif
				}
			}
		}
		/* TASK_YIELD; */
	}

	FINALLY
	    MAY_DBG(FN; STRLIT(" shutdown "); NDBG(ep->rfd.fd, d ); NDBG(task_now(), f));
	if(ep->reply_queue.suc && !link_empty(&ep->reply_queue))
	    empty_msg_list(&ep->reply_queue);
	unchecked_replace_pax_msg(&ep->p, NULL);
	shutdown_connection(&ep->rfd);
	DBGOUT(FN; NDBG(xcom_shutdown, d));
	if (ep->buf)
		X_FREE(ep->buf);
	free(ep->in_buf);
	// Allow the server reference to be freed.
	if (ep->srv) srv_unref(ep->srv);

	TASK_END;
}


/* }}} */

/* {{{ Reply handler task */
int const need_boot_special = 1;

static void	server_handle_need_snapshot(server *srv, site_def const *s, node_no node);

int	reply_handler_task(task_arg arg)
{
	DECL_ENV
	    server * s;
	pax_msg * reply;
	END_ENV;

	TASK_BEGIN

	ep->s = (server * )get_void_arg(arg);
	srv_ref(ep->s);
	ep->reply = NULL;

	for (; ; ) {
		while (!is_connected(&ep->s->con)) {
			MAY_DBG(FN; STRLIT("waiting for connection"));
			TASK_DELAY(1.000);
		}
		{
			int64_t	n;
			unchecked_replace_pax_msg(&ep->reply, pax_msg_new_0( null_synode));

			ADD_EVENTS(
				add_event(string_arg("ep->s->con.fd"));
				add_event(int_arg(ep->s->con.fd));
			);
			TASK_CALL(read_msg(&ep->s->con, ep->reply, ep->s, &n));
			ADD_EVENTS(
				add_event(string_arg("ep->s->con.fd"));
				add_event(int_arg(ep->s->con.fd));
			);
			ep->reply->refcnt = 1; /* Refcnt from other end is void here */
			if (n <= 0) {
				shutdown_connection(&ep->s->con);
				continue;
			}
			receive_bytes[ep->reply->op] += (uint64_t)n + MSG_HDR_SIZE;
		}
		MAY_DBG(FN;
		    NDBG(ep->s->con.fd, d); NDBG(task_now(), f);
		    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->reply));
		    );
		receive_count[ep->reply->op]++;

		/* Special test for need_snapshot, since node and site may not be consistent */
		if (need_boot_special && ep->reply->op == need_boot_op) {
			pax_msg * p = ep->reply;
			server_handle_need_snapshot(ep->s, get_site_def(), p->from);
		}else{
			//We only handle messages from this connection is the server is valid.
			if(ep->s->invalid == 0)
				dispatch_op(find_site_def(ep->reply->synode), ep->reply, NULL);
		}
		TASK_YIELD;
	}

	FINALLY
	    replace_pax_msg(&ep->reply, NULL);

	shutdown_connection(&ep->s->con);
	ep->s->reply_handler = NULL;
	MAY_DBG(FN; STRLIT(" shutdown "); NDBG(ep->s->con.fd, d); NDBG(task_now(), f));
	srv_unref(ep->s);

	TASK_END;
}


/* }}} */


/* purecov: begin deadcode */
static inline void xcom_sleep(unsigned int seconds)
{
#if defined (WIN32) || defined (WIN64)
 Sleep((DWORD)seconds*1000); /* windows sleep takes milliseconds */
#else
 sleep(seconds);
#endif
}

/* purecov: end */

/*
 * Get a unique long as the basis for XCom group id creation.
 *
 * NOTE:
 * As there is no gethostid() on win, we use seconds since epoch instead,
 * so it might fail if you try simultaneous create sites at the same second.
 */
long	get_unique_long(void)
{
#if defined(WIN32) || defined(WIN64)
	__time64_t ltime;

	_time64( &ltime );
	return (long) (ltime ^ GetCurrentProcessId());
#else
	return gethostid() ^ getpid();
#endif
}


/* {{{ Coroutine macros */
/*
   Coroutine device (or more precisely, a finite state machine, as the
   stack is not preserved), described by its inventor Tom Duff as
   being "too horrid to go into". The basic idea is that the switch
   can be used to jump anywhere in the code, so we note where we are
   when we return, and jump there when we enter the routine again by
   switching on the state, which is really a line number supplied by
   the CO_RETURN macro.
*/

#define CO_BEGIN switch(state){ default: assert(state == 0); case 0:
#define CO_END    }

#define CO_RETURN(x)                \
  {                                             \
    state = __LINE__;                          \
    return x;                                   \
  case __LINE__:;                               \
  }

#define HALT(x) while(1){ CO_RETURN(x);}

/* purecov: begin deadcode */
void send_app_data(app_data_ptr a)
{
	pax_msg * msg = pax_msg_new(null_synode, get_proposer_site());
	xcom_send(a, msg);
}

void	xcom_send_data(uint32_t size, char *data)
{
	app_data_ptr a = new_app_data();
	a->body.c_t = app_type;
	a->body.app_u_u.data.data_len = size;
	a->body.app_u_u.data.data_val = data;
	send_app_data(a);
}

app_data_ptr create_config(node_list *nl, cargo_type type)
{
	app_data_ptr a = new_app_data();
	a->body.c_t = type;
	init_node_list(nl->node_list_len, nl->node_list_val, &a->body.app_u_u.nodes);
	return a;
}
/* purecov: end */

app_data_ptr init_config_with_group(app_data *a, node_list *nl, cargo_type type,
                                    uint32_t group_id)
{
	init_app_data(a);
	a->app_key.group_id = a->group_id = group_id;
	a->body.c_t = type;
	init_node_list(nl->node_list_len, nl->node_list_val, &a->body.app_u_u.nodes);
	return a;
}

/* purecov: begin deadcode */
app_data_ptr create_config_with_group(node_list *nl, cargo_type type,
                                      uint32_t group_id)
{
	app_data_ptr a = new_app_data();
	return init_config_with_group(a, nl, type, group_id);
}

void send_boot(node_list *nl)
{	app_data_ptr a = create_config(nl, unified_boot_type);
	install_node_group(a); 	/* Cannot get consensus unless group is known */
	send_app_data(a);
}

void send_add_node(node_list *nl)
{
	send_app_data(create_config(nl, add_node_type));
}

void send_remove_node(node_list *nl)
{
	send_app_data(create_config(nl, remove_node_type));
}

void send_config(node_list *nl)
{
	send_app_data(create_config(nl, force_config_type));
}

void send_client_app_data(char *srv, xcom_port port, app_data_ptr a)
{
	pax_msg * msg = pax_msg_new(null_synode, 0);
	envelope *e = calloc(1, sizeof(envelope));

	msg->a = a;
	msg->to = VOID_NODE_NO;
	msg->op = client_msg;
	e->srv = strdup(srv);
	e->port = port;
	e->p = msg;
	e->crash_on_error = 0;
	task_new(client_task, void_arg(e), "client_task", XCOM_THREAD_DEBUG);
}

void send_client_boot(char *srv, xcom_port port, node_list *nl)
{
	send_client_app_data(srv, port, create_config(nl, unified_boot_type));
}

void send_client_add_node(char *srv, xcom_port port, node_list *nl)
{
	send_client_app_data(srv, port, create_config(nl, add_node_type));
}

void send_client_remove_node(char *srv, xcom_port port, node_list *nl)
{
	send_client_app_data(srv, port, create_config(nl, remove_node_type));
}

void send_client_config(char *srv, xcom_port port, node_list *nl)
{
	send_client_app_data(srv, port, create_config(nl, force_config_type));
}
/* purecov: end */

static void	server_send_snapshot(server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node)
{
	pax_msg * p = pax_msg_new(gcs_snap->log_start, get_site_def());
	ref_msg(p);
	p->op = gcs_snapshot_op;
	p->gcs_snap = gcs_snap;
	send_msg(srv, s->nodeno, node, get_group_id(s), p);
	unref_msg(&p);
}

/* purecov: begin deadcode */
static void	send_snapshot(site_def const *s, gcs_snapshot *gcs_snap, node_no node)
{
	assert(s->servers[node]);
	server_send_snapshot(s->servers[node], s, gcs_snap, node);
}
/* purecov: end */

static void	server_push_log(server *srv, synode_no push, node_no node)
{
	site_def const *s = get_site_def();
	while (!synode_gt(push, get_max_synode())) {
		if (is_cached(push)) {
			pax_machine * p = get_cache_no_touch(push);
			if (pm_finished(p)) {
				/* Need to clone message here since pax_machine may be re-used while message is sent */
				pax_msg * pm = clone_pax_msg(p->learner.msg);
				if (pm != NULL) {
				  ref_msg(pm);
				  pm->op = recover_learn_op;
				  send_msg(srv, s->nodeno, node, get_group_id(s), pm);
				  unref_msg(&pm);
				}
			}
		}
		push = incr_synode(push);
	}
}

/* purecov: begin deadcode */
static void	push_log(synode_no push, node_no node)
{
	site_def const * s = get_site_def();
	assert(s->servers[node]);
	server_push_log(s->servers[node], push, node);
}
/* purecov: end */

static app_snap_getter get_app_snap;
static app_snap_handler handle_app_snap;

/* purecov: begin deadcode */
static void	handle_need_snapshot(site_def const *s, node_no node)
{
	gcs_snapshot * gs = export_config();
	synode_no app_lsn = get_app_snap(&gs->app_snap);
	if (!synode_eq(null_synode, app_lsn) && synode_lt(app_lsn, gs->log_start))
		gs->log_start = app_lsn;
	send_snapshot(s, gs, node);
	push_log(gs->log_start, node);
}
/* purecov: end */

static void	server_handle_need_snapshot(server *srv, site_def const *s, node_no node)
{
	gcs_snapshot * gs = export_config();
	synode_no app_lsn = get_app_snap(&gs->app_snap);
	if (!synode_eq(null_synode, app_lsn) && synode_lt(app_lsn, gs->log_start)){
		gs->log_start = app_lsn;
	}
	else if (!synode_eq(null_synode, last_config_modification_id)) {
		gs->log_start = last_config_modification_id;
	}

	server_send_snapshot(srv, s, gs, node);
	server_push_log(srv, gs->log_start, node);
}

#define X(b) #b,
const char *xcom_state_name[] = {
	x_state_list
};

const char *xcom_actions_name[] = {
	x_actions
};
#undef X

xcom_state xcom_fsm(xcom_actions action, task_arg fsmargs)
{
	static int	state = 0;
	G_DEBUG("state %d action %s", state, xcom_actions_name[action]);
	switch (state) {
	default:
		assert(state == 0);
	case 0:
		/* Initialize basic xcom data */
		xcom_thread_init();
start:
		for (; ; ) {
			if (action == xa_init) {
				xcom_shutdown = 0;
				sent_alive = 0.0;
				oom_abort = 0;
			}
			if (action == xa_u_boot) {
/* purecov: begin deadcode */
				node_list * nl = get_void_arg(fsmargs);
				app_data_ptr a = create_config(nl, unified_boot_type);
				install_node_group(a); 	/* Cannot get consensus unless group is known */
				send_app_data(a);
				set_executed_msg(incr_msgno(get_site_def()->start));
				goto run;
/* purecov: end */
			}
			if (action == xa_add) {
/* purecov: begin deadcode */
				add_args * a = get_void_arg(fsmargs);
				send_client_add_node(a->addr, a->port, a->nl);
/* purecov: end */
			}
			if (action == xa_net_boot) {
				app_data * a = get_void_arg(fsmargs);
				install_node_group(a);
				set_executed_msg(incr_msgno(get_site_def()->start));
				goto run;
			}
			if (action == xa_snapshot) {
				goto recover;
			}
			if (action == xa_exit) {
				/* Xcom is finished when we get here */
				bury_site(get_group_id(get_site_def()));
				task_terminate_all();    /* Kill, kill, kill, kill, kill, kill. This is the end. */

				init_xcom_base();        /* Reset shared variables */
				init_tasks();            /* Reset task variables */
				free_site_defs();
				free_forced_config_site_def();
				garbage_collect_servers();
				DBGOUT(FN; STRLIT("shutting down"));
				xcom_shutdown = 1;
				if(xcom_exit_cb)
					xcom_exit_cb(get_int_arg(fsmargs));
				G_DEBUG("Exiting xcom thread");
			}
			CO_RETURN(x_start);
		}
recover:
		 {
			pax_msg * p = get_void_arg(fsmargs);
			import_config(p->gcs_snap);
			handle_app_snap(&p->gcs_snap->app_snap);
			set_executed_msg(p->gcs_snap->log_start);

			set_last_received_config(p->gcs_snap->log_start);

			DBGOUT(FN; SYCEXP(executed_msg); );
			for (; ; ) {
				if (action == xa_terminate) {
					goto start;
				}
				if (action == xa_complete) {
					goto run;
				}

				CO_RETURN(x_recover);
			}
		}
run:
		DBGOUT(FN; SYCEXP(executed_msg); );
		if(xcom_run_cb)
			xcom_run_cb(0);
		force_recover = 0;
		client_boot_done = 1;
		netboot_ok = 1;
		booting = 0;
		set_proposer_startpoint();
		create_proposers();
		set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
		set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
		set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
		set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));

		for (; ; ) {
			if (action == xa_terminate) {
				force_recover = 0;
				client_boot_done = 0;
				netboot_ok = 0;
				booting = 0;
				oom_abort = 0;
				terminate_proposers();
				init_proposers();
				task_terminate(executor);
				set_task(&executor, NULL);
				task_terminate(sweeper);
				set_task(&sweeper, NULL);
				task_terminate(detector);
				set_task(&detector, NULL);
				task_terminate(alive_t);
				set_task(&alive_t, NULL);

				init_xcom_base();             /* Reset shared variables */
				free_site_defs();
				free_forced_config_site_def();
				garbage_collect_servers();
				if(xcom_terminate_cb)
					xcom_terminate_cb(get_int_arg(fsmargs));
				goto start;
			}
			if (action == xa_need_snapshot) {
				pax_msg * p = get_void_arg(fsmargs);
				handle_need_snapshot(find_site_def(p->synode), p->from);
			}
			if (action == xa_force_config) {
				app_data * a = get_void_arg(fsmargs);
				site_def *s = create_site_def_with_start(a, executed_msg);
				s->boot_key = executed_msg;
				invalidate_servers(get_site_def(), s);
				start_force_config(s);
			}
			CO_RETURN(x_run);
		}
	}
}

/* purecov: begin deadcode */
void xcom_add_node(char *addr, xcom_port port, node_list *nl)
{
	if (xcom_mynode_match(addr, port)) {
		XCOM_FSM(xa_u_boot, void_arg(nl)); /* Boot */
	} else {
		add_args a;
		a.addr = addr;
		a.port = port;
		a.nl = nl;
		XCOM_FSM(xa_add, void_arg(&a)); /* Only initialize xcom */
	}
}


void xcom_fsm_add_node(char *addr, node_list *nl)
{
	xcom_port	node_port = xcom_get_port(addr);
	char	*node_addr = xcom_get_name(addr);

	if (xcom_mynode_match(node_addr, node_port)) {
		node_list x_nl;
		x_nl.node_list_len = 1;
		x_nl.node_list_val = new_node_address(x_nl.node_list_len, &addr);
		XCOM_FSM(xa_u_boot, void_arg(&x_nl));
		delete_node_address(x_nl.node_list_len, x_nl.node_list_val);
	} else {
		add_args a;
		a.addr = node_addr;
		a.port = node_port;
		a.nl = nl;
		XCOM_FSM(xa_add, void_arg(&a));
	}
	free(node_addr);
}
/* purecov: end */

void set_app_snap_handler(app_snap_handler x)
{
	handle_app_snap = x;
}

void set_app_snap_getter(app_snap_getter x)
{
	get_app_snap = x;
}

/* Initialize sockaddr based on server and port */
static int	init_sockaddr(char *server, struct sockaddr_in *sock_addr,
                          socklen_t *sock_size, xcom_port port)
{
	/* Get address of server */
	struct addrinfo *addr = 0;

	checked_getaddrinfo(server, 0, 0, &addr);

	if (addr == 0) {
		return 0;
	}

	/* Copy first address */
	memcpy(sock_addr, addr->ai_addr, addr->ai_addrlen);
	*sock_size = addr->ai_addrlen;
	sock_addr->sin_port = htons(port);
	freeaddrinfo(addr);

	return 1;
}


static result	checked_create_socket(int domain, int type, int protocol)
{
	result	retval = {0,0};
	int	retry = 1000;

	do {
		SET_OS_ERR(0);
		retval.val = socket(domain, type, protocol);
		retval.funerr = to_errno(GET_OS_ERR);
	} while (--retry && retval.val == -1 && (from_errno(retval.funerr) == SOCK_EAGAIN));

	if (retval.val == -1) {
		task_dump_err(retval.funerr);
#if defined (WIN32) || defined (WIN64)
		G_MESSAGE("Socket creation failed with error %d.",
			retval.funerr);
#else
		G_MESSAGE("Socket creation failed with error %d - %s.",
			retval.funerr, strerror(retval.funerr));
#endif
		abort();
	}
	return retval;
}

/* Read max n bytes from socket fd into buffer buf */
static result	socket_read(connection_descriptor*rfd, void *buf, int n)
{
	result 	ret = {0,0};

	assert(n >= 0);

	do {
	  ret = con_read(rfd, buf, n);
	  task_dump_err(ret.funerr);
	} while (ret.val < 0 && can_retry_read(ret.funerr));
	assert(!can_retry_read(ret.funerr));
	return ret;
}


/* Read exactly n bytes from socket fd into buffer buf */
static int64_t	socket_read_bytes(connection_descriptor *rfd, char *p, uint32_t n)
{
	uint32_t	left= n;
	char	*bytes= p;

	result	nread = {0,0};

	while (left > 0) {
		/*
		  socket_read just reads no more than INT_MAX bytes. We should not pass
		  a length more than INT_MAX to it.
		*/
		int r = (int) MIN(left, INT_MAX);

		nread = socket_read(rfd, bytes, r);
		if (nread.val == 0) {
			return 0;
		} else if (nread.val < 0) {
			return - 1;
		} else {
			bytes += nread.val;
			left -= (uint32_t)nread.val;
		}
	}
	assert(left == 0);
	return n;
}

/* Write n bytes from buffer buf to socket fd */
static int64_t	socket_write(connection_descriptor *wfd, void *_buf, uint32_t n)
{
	char	*buf = (char*) _buf;
	result 	ret = {0,0};

	uint32_t	total; /* Keeps track of number of bytes written so far */

	total = 0;
	while (total < n) {
		int w= (int) MIN(n - total, INT_MAX);

		while ((ret = con_write(wfd, buf + total, w)).val < 0 &&
			   can_retry_write(ret.funerr)) {
			task_dump_err(ret.funerr);
			DBGOUT(FN; STRLIT("retry "); NEXP(total, d); NEXP(n, d));
		}
		if (ret.val <= 0) { /* Something went wrong */
			task_dump_err(ret.funerr);
			return - 1;
		} else {
			total += (uint32_t)ret.val; /* Add number of bytes written to total */
		}
	}
	DBGOUT(FN; NEXP(total, u); NEXP(n, u));
	assert(total == n);
	return(total);
}

static inline result xcom_close_socket(int *sock)
{
	result res = {0,0};
	if (*sock != -1) {
		do {
			SET_OS_ERR(0);
			res.val = CLOSESOCKET(*sock);
			res.funerr = to_errno(GET_OS_ERR);
		} while (res.val == -1 && from_errno(res.funerr) == SOCK_EINTR);
		*sock = -1;
	}
	return res;
}

static inline result xcom_shut_close_socket(int *sock)
{
	result res = {0,0};
	if (*sock >= 0) {
#if defined (WIN32) || defined (WIN64)
               static LPFN_DISCONNECTEX DisconnectEx = NULL;
               if (DisconnectEx == NULL)
               {
                       DWORD dwBytesReturned;
                       GUID guidDisconnectEx = WSAID_DISCONNECTEX;
                       WSAIoctl(*sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
                               &guidDisconnectEx, sizeof(GUID),
                               &DisconnectEx, sizeof(DisconnectEx),
                               &dwBytesReturned, NULL, NULL);
               }
               if (DisconnectEx != NULL)
               {
                       (DisconnectEx(*sock, (LPOVERLAPPED)NULL,
                               (DWORD)0, (DWORD)0) == TRUE) ? 0 : -1;
               }
               else
#endif
		shutdown(*sock, _SHUT_RDWR);
		res = xcom_close_socket(sock);
	}
	return res;
}

#define CONNECT_FAIL ret_fd = -1; goto end

static int timed_connect(int fd, sockaddr *sock_addr, socklen_t sock_size)
{
  int timeout = 10000;
  int ret_fd = fd;
  int syserr;
  int sysret;
  struct pollfd fds;
#ifdef WITH_LOG_DEBUG
  char buf[SYS_STRERROR_SIZE];
#endif

  fds.fd = fd;
  fds.events = POLLOUT;
  fds.revents = 0;

  /* Set non-blocking */
  if (unblock_fd(fd) < 0)
    return -1;

  /* Trying to connect with timeout */
  SET_OS_ERR(0);
  sysret = connect(fd, sock_addr, sock_size);

  if (is_socket_error(sysret)) {
    syserr = GET_OS_ERR;
    /* If the error is SOCK_EWOULDBLOCK or SOCK_EINPROGRESS or SOCK_EALREADY,
     * wait. */
    switch (syserr) {
      case SOCK_EWOULDBLOCK:
      case SOCK_EINPROGRESS:
      case SOCK_EALREADY:
        break;
      default:
        G_DEBUG("connect - Error connecting (socket=%d, error=%d).",
                fd, syserr);
        CONNECT_FAIL;
    }

    SET_OS_ERR(0);
    while ((sysret = poll(&fds, 1, timeout)) < 0) {
      syserr = GET_OS_ERR;
      if (syserr != SOCK_EINTR && syserr != SOCK_EINPROGRESS) break;
      SET_OS_ERR(0);
    }
    MAY_DBG(FN; STRLIT("poll - Finished. "); NEXP(sysret, d));

    if (sysret == 0) {
      G_DEBUG("Timed out while waiting for connection to be established! "
              "Canceling connection attempt. (socket= %d, error=%d)",
              fd, sysret);
      CONNECT_FAIL;
    }

    if (is_socket_error(sysret)) {
      G_DEBUG("poll - Error while connecting! (socket= %d, error=%d)",
              fd, syserr);
      CONNECT_FAIL;
    }

    {
      int socket_errno = 0;
      socklen_t socket_errno_len = sizeof(socket_errno);

      if ((fds.revents & POLLOUT) == 0) {
        MAY_DBG(FN; STRLIT("POLLOUT not set - Socket failure!"););
        ret_fd = -1;
      }

      if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) {
        MAY_DBG(FN;
                STRLIT("POLLERR | POLLHUP | POLLNVAL set - Socket failure!"););
        ret_fd = -1;
      }

      if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &socket_errno,
                     &socket_errno_len) != 0) {
        G_DEBUG("getsockopt socket %d failed.", fd);
        ret_fd = -1;
      } else {
        if (socket_errno != 0) {
          G_DEBUG("Connection to socket %d failed with error %d - %s.", fd,
                  socket_errno, strerr_msg(buf, sizeof(buf), socket_errno));
          ret_fd = -1;
        }
      }
    }
  }

end:
  /* Set blocking */
  SET_OS_ERR(0);
  if(block_fd(fd) < 0) {
    G_DEBUG(
        "Unable to set socket back to blocking state. (socket=%d, error=%d).",
        fd, GET_OS_ERR);
    return -1;
  }

  return ret_fd;
}


/* Connect to server on given port */
static connection_descriptor*	connect_xcom(char *server, xcom_port port)
{
	result fd = {0,0};
	result ret = {0,0};
	struct sockaddr_in sock_addr;
	socklen_t sock_size;
#ifdef WITH_LOG_DEBUG
        char buf[SYS_STRERROR_SIZE];
#endif

	DBGOUT(FN; STREXP(server); NEXP(port, d));
	G_DEBUG("connecting to %s %d", server, port);
	/* Create socket */
	if ((fd = checked_create_socket(AF_INET, SOCK_STREAM, 0)).val < 0) {
		G_DEBUG("Error creating sockets.");
		return NULL;
	}

	/* Get address of server */
	if (!init_sockaddr(server, &sock_addr, &sock_size, port)) {
		xcom_close_socket(&fd.val);
		G_DEBUG("Error initializing socket addresses.");
		return NULL;
	}

	/* Connect socket to address */

	SET_OS_ERR(0);
	if (timed_connect(fd.val, (struct sockaddr *)&sock_addr, sock_size) == -1) {
		fd.funerr = to_errno(GET_OS_ERR);
		G_DEBUG("Connecting socket to address %s in port %d failed with error %d - %s.",
				server, port, fd.funerr, strerr_msg(buf, sizeof(buf), fd.funerr));
		xcom_close_socket(&fd.val);
		return NULL;
	}

	{
		int	peer = 0;
		/* Sanity check before return */
		SET_OS_ERR(0);
		ret.val = peer = getpeername(fd.val, (struct sockaddr *)&sock_addr,
		                             &sock_size);
		ret.funerr = to_errno(GET_OS_ERR);
		if (peer >= 0) {
			ret = set_nodelay(fd.val);
			if(ret.val < 0){
				task_dump_err(ret.funerr);
				xcom_shut_close_socket(&fd.val);
#if defined (WIN32) || defined (WIN64)
				G_DEBUG("Setting node delay failed while connecting to %s with error %d.",
						server, ret.funerr);
#else
				G_DEBUG("Setting node delay failed while connecting to %s with error %d - %s.",
						server, ret.funerr, strerror(ret.funerr));
#endif
				return NULL;
			}
			G_DEBUG("client connected to %s %d fd %d", server, port, fd.val);
		} else {
			/* Something is wrong */
			socklen_t errlen = sizeof(ret.funerr);
			DBGOUT(FN; STRLIT("getpeername failed"); );
			if (ret.funerr) {
				DBGOUT(FN; NEXP(from_errno(ret.funerr), d);
					   STRLIT(strerror(from_errno(ret.funerr))));
			}
			getsockopt(fd.val, SOL_SOCKET, SO_ERROR, (void *) & ret.funerr, &errlen);
			if (ret.funerr == 0) {
				ret.funerr = to_errno(SOCK_ECONNREFUSED);
			}
			xcom_shut_close_socket(&fd.val);
#if defined (WIN32) || defined (WIN64)
			G_DEBUG("Getting the peer name failed while connecting to server %s with error %d.",
					server, ret.funerr);
#else
			G_DEBUG("Getting the peer name failed while connecting to server %s with error %d -%s.",
					server, ret.funerr, strerror(ret.funerr));
#endif
			return NULL;
		}

#ifdef XCOM_HAVE_OPENSSL
		if (xcom_use_ssl()) {
			connection_descriptor *cd = 0;
			SSL * ssl = SSL_new(client_ctx);
			G_DEBUG("Trying to connect using SSL.")
			SSL_set_fd(ssl, fd.val);

			ERR_clear_error();
			ret.val = SSL_connect(ssl);
			ret.funerr = to_ssl_err(SSL_get_error(ssl, ret.val));

			if (ret.val != SSL_SUCCESS) {
				G_MESSAGE("Error connecting using SSL %d %d.",
					  ret.funerr, SSL_get_error(ssl, ret.val));
				task_dump_err(ret.funerr);
				SSL_shutdown(ssl);
				SSL_free(ssl);
				xcom_shut_close_socket(&fd.val);
				return NULL;
			}
			DBGOUT(FN; STRLIT("ssl connected to "); STRLIT(server); NDBG(port,d); NDBG(fd.val, d); PTREXP(ssl));

			if (ssl_verify_server_cert(ssl, server))
			{
				G_MESSAGE("Error validating certificate and peer.");
				task_dump_err(ret.funerr);
				SSL_shutdown(ssl);
				SSL_free(ssl);
				xcom_shut_close_socket(&fd.val);
				return NULL;
			}

			cd = new_connection(fd.val, ssl);
			set_connected(cd, CON_FD);
			G_DEBUG("Success connecting using SSL.")
			return cd;
		} else {
			connection_descriptor *cd = new_connection(fd.val, 0);
			set_connected(cd, CON_FD);
			return cd;
		}
#else
		{
			connection_descriptor *cd = new_connection(fd.val);
			set_connected(cd, CON_FD);
			return cd;
		}
#endif
	}
}

connection_descriptor*	xcom_open_client_connection(char *server, xcom_port port)
{
	return connect_xcom(server, port);
}

/* Send a protocol negotiation message on connection con */
static int	xcom_send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag)
{
	char	buf[MSG_HDR_SIZE];
        memset(buf, 0, MSG_HDR_SIZE);

	if (con->fd >= 0) {
		con->snd_tag = tag;
		write_protoversion(VERS_PTR((unsigned char*) buf), x_proto);
		put_header_1_0((unsigned char*) buf, 0, x_type, tag);
		{
			int	sent;
			sent = (int)socket_write(con, buf, MSG_HDR_SIZE);
			if (con->fd < 0) {
				return -1;
			}
			return sent;
		}
	} else {
		return -1;
	}
}

static int	xcom_recv_proto(connection_descriptor * rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag)
{
	int	n;
	unsigned char	header_buf[MSG_HDR_SIZE];
	uint32_t	msgsize;

	/* Read length field, protocol version, and checksum */
	n = (int)socket_read_bytes(rfd, (char*)header_buf, MSG_HDR_SIZE);

	if (n != MSG_HDR_SIZE) {
		DBGOUT(FN; NDBG(n, d));
		return -1;
	}

	*x_proto = read_protoversion(VERS_PTR(header_buf));
	get_header_1_0(header_buf, &msgsize, x_type, tag);

	return n;
}

#define TAG_START 313

static int64_t	xcom_send_client_app_data(connection_descriptor *fd, app_data_ptr a, int force)
{
	pax_msg * msg = pax_msg_new(null_synode, 0);
	uint32_t	buflen = 0;
	char	*buf = 0;
	int64_t retval= 0;

	if(! proto_done(fd)){
		xcom_proto x_proto;
		x_msg_type x_type;
		unsigned int tag;
		retval = xcom_send_proto(fd, my_xcom_version, x_version_req, TAG_START);
		G_DEBUG("client sent negotiation request for protocol %d",my_xcom_version);
		if(retval < 0)
			goto end;
		retval = xcom_recv_proto(fd, &x_proto, &x_type, &tag);
		if(retval < 0)
			goto end;
		if(tag != TAG_START)
		{
			retval = -1;
			goto end;
		}
		if(x_type != x_version_reply)
		{
			retval = -1;
			goto end;
		}

		if(x_proto == x_unknown_proto){
			G_DEBUG("no common protocol, returning error");
			retval = -1;
			goto end;
		}
		G_DEBUG("client connection will use protocol version %d",x_proto);
		DBGOUT(STRLIT("client connection will use protocol version ");
			   NDBG(x_proto,u); STRLIT(xcom_proto_to_str(x_proto)));
		fd->x_proto = x_proto;
		set_connected(fd, CON_PROTO);
	}
	msg->a = a;
	msg->to = VOID_NODE_NO;
	msg->op = client_msg;
	msg->force_delivery = force;

	serialize_msg(msg, fd->x_proto, &buflen, &buf);
	if(buflen){
		retval = socket_write(fd, buf, buflen);
		if (buflen != retval) {
			DBGOUT(FN; STRLIT("write failed "); NDBG(fd->fd, d);
				   NDBG(buflen, d); NDBG(retval, d));
		}
		X_FREE(buf);
	}
end:
	msg->a = 0; /* Do not deallocate a */
	XCOM_XDR_FREE(xdr_pax_msg, msg);
	return retval;
}

int64_t	xcom_client_send_data(uint32_t size, char *data, connection_descriptor *fd)
{
	app_data a;
	int64_t retval = 0;
	init_app_data(&a);
	a.body.c_t = app_type;
	a.body.app_u_u.data.data_len = size;
	a.body.app_u_u.data.data_val = data;
	retval = xcom_send_client_app_data(fd, &a, 0);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}

static pax_msg *	socket_read_msg(connection_descriptor *rfd, pax_msg *p)
/* Should buffer reads as well */
{
	int64_t	n;
	char	*bytes;
	unsigned char header_buf[MSG_HDR_SIZE];
	xcom_proto x_version;
	uint32_t	msgsize;
	x_msg_type x_type;
	unsigned int tag;
	int deserialize_ok = 0;

	bytes = NULL;

	/* Read version, length, type, and tag */
	n = socket_read_bytes(rfd, (char*)header_buf, MSG_HDR_SIZE);

	if (n <= 0) {
		DBGOUT(FN; NDBG(n, ll));
		return 0;
	}
	assert(n == MSG_HDR_SIZE);
	x_version = get_32(VERS_PTR(header_buf));
	/* Check the protocol version before doing anything else */
#ifdef XCOM_PARANOID
	assert(check_protoversion(x_version, rfd->x_proto));
#endif
	if (!check_protoversion(x_version, rfd->x_proto)) {
		return 0;
	}

	/* OK, we can grok this version */

	get_header_1_0(header_buf, &msgsize, & x_type, &tag);

	/* Allocate buffer space for message */
	bytes = calloc(1, msgsize);

	/* Read message */
	n = socket_read_bytes(rfd, bytes, msgsize);

	if (n > 0) {
		/* Deserialize message */
		deserialize_ok = deserialize_msg(p, rfd->x_proto, bytes, msgsize);
		MAY_DBG(FN; STRLIT(" deserialized message"));
	}
	/* Deallocate buffer */
	X_FREE(bytes);
	if (n <= 0 || deserialize_ok == 0) {
		DBGOUT(FN; NDBG(n, ll));
		return 0;
	}
	return(p);
}

int	xcom_close_client_connection(connection_descriptor *connection)
{
	int	retval = 0;

#ifdef XCOM_HAVE_OPENSSL
	if (connection->ssl_fd) {
		SSL_shutdown(connection->ssl_fd);
		ssl_free_con(connection);
	}
#endif
	retval =  xcom_shut_close_socket(&connection->fd).val;
	free(connection);
	return retval;
}

int	xcom_client_boot(connection_descriptor *fd, node_list *nl, uint32_t group_id)
{
	app_data a;
	int retval = 0;
	retval =  (int)xcom_send_client_app_data(fd, init_config_with_group(&a, nl, unified_boot_type, group_id), 0);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}

int xcom_send_app_wait(connection_descriptor *fd, app_data *a, int force)
{
	int retval = 0;
	int retry_count = 10; // Same as 'connection_attempts'
	pax_msg p;
	pax_msg *rp = 0;

	do {
		retval = (int)xcom_send_client_app_data(fd, a, force);
		if(retval < 0)
			return 0;
		memset(&p, 0, sizeof(p));
		rp = socket_read_msg(fd, &p);
		if(rp){
			client_reply_code cli_err = rp->cli_err;
			my_xdr_free((xdrproc_t)xdr_pax_msg, (char*)&p);
			switch(cli_err){
				case REQUEST_OK:
					return 1;
				case REQUEST_FAIL:
                                        G_DEBUG("cli_err %d",cli_err);
					return 0;
				case REQUEST_RETRY:
			                G_DEBUG("cli_err %d",cli_err);
					xcom_sleep(1);
					break;
				default:
					G_WARNING("client protocol botched");
					return 0;
			}
		}else{
			G_WARNING("read failed");
			return 0;
		}
	} while (--retry_count);
	// Timeout after REQUEST_RETRY has been received 'retry_count' times
	G_MESSAGE(
	 "Request failed: maximum number of retries (10) has been exhausted.");
	return 0;
}

int xcom_send_cfg_wait(connection_descriptor * fd, node_list *nl,
                       uint32_t group_id, cargo_type ct, int force)
{
	app_data a;
	int retval = 0;
	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(nl)););
	retval = xcom_send_app_wait(fd, init_config_with_group(&a, nl, ct, group_id), force);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}

int	xcom_client_add_node(connection_descriptor *fd, node_list *nl,
                         uint32_t group_id)
{
	return xcom_send_cfg_wait(fd, nl, group_id, add_node_type, 0);
}

int	xcom_client_remove_node(connection_descriptor *fd, node_list *nl,
                            uint32_t group_id)
{
	return xcom_send_cfg_wait(fd, nl, group_id, remove_node_type, 0);
}

#ifdef NOTDEF
/* Not completely implemented, need to be handled properly
   when received as a client message in dispatch_op.
   Should have separate opcode from normal add/remove,
   like force config_type */
int xcom_client_force_add_node(connection_descriptor *, node_list *nl,
                               uint32_t group_id)
{
	return xcom_send_cfg_wait(fd, nl, group_id, add_node_type, 1);
}

int xcom_client_force_remove_node(connection_descriptor *, node_list *nl,
                                  uint32_t group_id)
{
	return xcom_send_cfg_wait(fd, nl, group_id, remove_node_type, 1);
}
#endif

int xcom_client_force_config(connection_descriptor *fd, node_list *nl,
                             uint32_t group_id)
{
	return xcom_send_cfg_wait(fd, nl, group_id, force_config_type, 1);
}

int	xcom_client_enable_arbitrator(connection_descriptor *fd)
{
	app_data a;
	int retval = 0;
	init_app_data(&a);
	a.body.c_t = enable_arbitrator;
	retval = xcom_send_app_wait(fd, &a, 0);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}


int	xcom_client_disable_arbitrator(connection_descriptor *fd)
{
	app_data a;
	int retval = 0;
	init_app_data(&a);
	a.body.c_t = disable_arbitrator;
	retval = xcom_send_app_wait(fd, &a, 0);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}

int	xcom_client_terminate_and_exit(connection_descriptor *fd)
{
	app_data a;
	int retval = 0;
	init_app_data(&a);
	a.body.c_t = x_terminate_and_exit;
	retval = xcom_send_app_wait(fd, &a, 0);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}

int	xcom_client_set_cache_limit(connection_descriptor *fd, uint64_t cache_limit)
{
	app_data a;
	int retval = 0;
	init_app_data(&a);
	a.body.c_t = set_cache_limit;
	a.body.app_u_u.cache_limit = cache_limit;
	retval = xcom_send_app_wait(fd, &a, 0);
	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
	return retval;
}





Youez - 2016 - github.com/yon3zu
LinuXploit