neilconway / overlog-paxos

A clean implementation of the Paxos consensus protocol in Overlog, a language for distributed computing.

commit 29: 7ba2c54468af
parent 28: 8e5f83fc7ce8
branch: default
tags: tip
comments for source and sink
Peter Alvaro
4 months ago
r29:7ba2c54468af 164 loc 4.2 KB embed / history / annotate / raw /
program paxos_propose;

import java.lang.System;

define(max_proposal, keys(0, 1), {
  String,   // Master
  Integer,  // Sequence number
  Integer   // Max View number
});

public
max_proposal(Master, SeqNo, max<View>) :-
    paxos_prepare::prepare_oklist(Master, View, SeqNo, _, "Proposed", _, _);


/******** send_propose block *********************/
define(send_propose, keys(0, 1, 2), {
  String,   // Agent
  String,   // Master
  Integer,  // View number
  Integer,  // ARU
  String   // Update
}); 

// Constrained update
public
send_propose(@Agent, Master, MyView, Aru, Update) :-
    duty_cycle(@Master),
    paxos::parliament(@Master, Agent),
    max_proposal(@Master, SeqNo, View),
    paxos_prepare::quorum(@Master, MyView),
    paxos_prepare::local_aru(@Master, Aru),
    paxos_prepare::prepare_oklist(@Master, View, SeqNo, Update, "Proposed", _, _);

// dequeue on the duty cycle only, not on deltas to local_aru, etc.
timer(t, physical, 25, infinity, 0);
define(duty_cycle, {String});
duty_cycle(Me) :-
    paxos::self(Me),
    t();

// Unconstrained update
send_propose(@Agent, Master, View, Aru, Update) :-
    duty_cycle(@Master),
    paxos::parliament(@Master, Agent),
    notin paxos_prepare::prepare_oklist(@Master, View, _, _, "Proposed", _, _),
    paxos_prepare::quorum(@Master, View),
    leader::last_installed(@Master, View),
    paxos_prepare::local_aru(@Master, Aru),
    q(@Master, Update, R, Id),
    top_q(@Master, Id);

/************************************/

/********* queue block **************/

/* 
  A client application generates its own unique ids and inserts
  tuples into q().

  Reading back the update from global_history() confirms that the 
  it was successfully globally ordered.
*/

define(q, keys(0, 1, 2), {
  String,   // Agent
  String,   // Update
  String,   // Requestor
  Long      // Unique id
});

define(top_q, keys(0), {
  String,   // Agent
  Long      // Minimum id
});

public
top_q(Master, min<Id>) :-
    q(Master, _, _, Id);

delete 
q(Me, Update, Sender, Id) :-
    q(Me, Update, Sender, Id),
    globally_ordered(Me, _, _, Update);

/************************************/

/********* accept block **************/
public
paxos_prepare::accept(@Other, Agent, View, SeqNo, Update) :-   
    send_propose(@Agent, _, View, SeqNo, Update),
    notin paxos_prepare::global_history(@Agent, SeqNo, _, Update),
    paxos::parliament(@Agent, Other, _);

public
delete
paxos_prepare::accept(Agent, Master, View, SeqNo, Update) :-
    paxos_prepare::accept(Agent, Master, View, SeqNo, Update),
    paxos_prepare::accept(Agent, _, View2, SeqNo, _),
    View2 > View;

public
delete
paxos_prepare::accept(Agent, Master, View, SeqNo, Update) :-
    paxos_prepare::accept(Agent, Master, View, SeqNo, Update),
    globally_ordered(Agent, _, SeqNo, Update);

define(accept_cnt, keys(0, 1, 2), {
  String,   // Agent
  Integer,  // Other agent
  Integer,  // Sequence number
  Integer   // Count of accepts
});

public
accept_cnt(Me, View, SeqNo, count<Agent>) :-
    paxos_prepare::accept(Me, Agent, View, SeqNo, _);

/************************************/

define(globally_ordered, {
  String,   // Agent
  Integer,  // View number
  Integer,  // Sequence number
  String    // Update
});

globally_ordered(Me, View, SeqNo, Update) :-
    accept_cnt(Me, View, SeqNo, Cnt),
    paxos::priestCnt(Me, PCnt),
    Cnt > (PCnt / 2),
    send_propose(Me, _, View, SeqNo, Update);


/* actions taken on tables defined in earlier programs: */
leader::progress_timer(Me, Start, Duration) :-
    leader::progress_timer(Me, _, _),
    leader::progress_timer_start(Duration),
    paxos_prepare::global_history(Me, _, _, _),
    Start := System.currentTimeMillis();

define(hmax, keys(0), {
  String,   // Agent
  Integer   // Max sequence number
});

public
hmax(Agent, max<SeqNo>) :-
    paxos_prepare::global_history(Agent, SeqNo, _, _);

public
paxos_prepare::local_aru(Agent, SeqNo + 1) :-
    hmax(Agent, SeqNo);

public
paxos_prepare::global_history(Agent, SeqNo, Requestor, Update) :-
    globally_ordered(Agent, _, SeqNo, Update),
    Requestor := "?";

public
delete
paxos_prepare::prepare_oklist(Master, View, SeqNo, Update, Type, Len, Agent) :-
    paxos_prepare::prepare_oklist(Master, View, SeqNo, Update, Type, Len, Agent),
    globally_ordered(Master, _, _, Update);