neilconway / overlog-paxos
A clean implementation of the Paxos consensus protocol in Overlog, a language for distributed computing.
Clone this repository (size: 4.5 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/neilconway/overlog-paxos/
| commit 29: | 7ba2c54468af |
| parent 28: | 8e5f83fc7ce8 |
| branch: | default |
| tags: | tip |
comments for source and sink
4 months ago
| r29:7ba2c54468af | 164 loc | 4.2 KB | embed / history / annotate / raw / |
|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | program paxos_propose;
import java.lang.System;
define(max_proposal, keys(0, 1), {
String, // Master
Integer, // Sequence number
Integer // Max View number
});
public
max_proposal(Master, SeqNo, max<View>) :-
paxos_prepare::prepare_oklist(Master, View, SeqNo, _, "Proposed", _, _);
/******** send_propose block *********************/
define(send_propose, keys(0, 1, 2), {
String, // Agent
String, // Master
Integer, // View number
Integer, // ARU
String // Update
});
// Constrained update
public
send_propose(@Agent, Master, MyView, Aru, Update) :-
duty_cycle(@Master),
paxos::parliament(@Master, Agent),
max_proposal(@Master, SeqNo, View),
paxos_prepare::quorum(@Master, MyView),
paxos_prepare::local_aru(@Master, Aru),
paxos_prepare::prepare_oklist(@Master, View, SeqNo, Update, "Proposed", _, _);
// dequeue on the duty cycle only, not on deltas to local_aru, etc.
timer(t, physical, 25, infinity, 0);
define(duty_cycle, {String});
duty_cycle(Me) :-
paxos::self(Me),
t();
// Unconstrained update
send_propose(@Agent, Master, View, Aru, Update) :-
duty_cycle(@Master),
paxos::parliament(@Master, Agent),
notin paxos_prepare::prepare_oklist(@Master, View, _, _, "Proposed", _, _),
paxos_prepare::quorum(@Master, View),
leader::last_installed(@Master, View),
paxos_prepare::local_aru(@Master, Aru),
q(@Master, Update, R, Id),
top_q(@Master, Id);
/************************************/
/********* queue block **************/
/*
A client application generates its own unique ids and inserts
tuples into q().
Reading back the update from global_history() confirms that the
it was successfully globally ordered.
*/
define(q, keys(0, 1, 2), {
String, // Agent
String, // Update
String, // Requestor
Long // Unique id
});
define(top_q, keys(0), {
String, // Agent
Long // Minimum id
});
public
top_q(Master, min<Id>) :-
q(Master, _, _, Id);
delete
q(Me, Update, Sender, Id) :-
q(Me, Update, Sender, Id),
globally_ordered(Me, _, _, Update);
/************************************/
/********* accept block **************/
public
paxos_prepare::accept(@Other, Agent, View, SeqNo, Update) :-
send_propose(@Agent, _, View, SeqNo, Update),
notin paxos_prepare::global_history(@Agent, SeqNo, _, Update),
paxos::parliament(@Agent, Other, _);
public
delete
paxos_prepare::accept(Agent, Master, View, SeqNo, Update) :-
paxos_prepare::accept(Agent, Master, View, SeqNo, Update),
paxos_prepare::accept(Agent, _, View2, SeqNo, _),
View2 > View;
public
delete
paxos_prepare::accept(Agent, Master, View, SeqNo, Update) :-
paxos_prepare::accept(Agent, Master, View, SeqNo, Update),
globally_ordered(Agent, _, SeqNo, Update);
define(accept_cnt, keys(0, 1, 2), {
String, // Agent
Integer, // Other agent
Integer, // Sequence number
Integer // Count of accepts
});
public
accept_cnt(Me, View, SeqNo, count<Agent>) :-
paxos_prepare::accept(Me, Agent, View, SeqNo, _);
/************************************/
define(globally_ordered, {
String, // Agent
Integer, // View number
Integer, // Sequence number
String // Update
});
globally_ordered(Me, View, SeqNo, Update) :-
accept_cnt(Me, View, SeqNo, Cnt),
paxos::priestCnt(Me, PCnt),
Cnt > (PCnt / 2),
send_propose(Me, _, View, SeqNo, Update);
/* actions taken on tables defined in earlier programs: */
leader::progress_timer(Me, Start, Duration) :-
leader::progress_timer(Me, _, _),
leader::progress_timer_start(Duration),
paxos_prepare::global_history(Me, _, _, _),
Start := System.currentTimeMillis();
define(hmax, keys(0), {
String, // Agent
Integer // Max sequence number
});
public
hmax(Agent, max<SeqNo>) :-
paxos_prepare::global_history(Agent, SeqNo, _, _);
public
paxos_prepare::local_aru(Agent, SeqNo + 1) :-
hmax(Agent, SeqNo);
public
paxos_prepare::global_history(Agent, SeqNo, Requestor, Update) :-
globally_ordered(Agent, _, SeqNo, Update),
Requestor := "?";
public
delete
paxos_prepare::prepare_oklist(Master, View, SeqNo, Update, Type, Len, Agent) :-
paxos_prepare::prepare_oklist(Master, View, SeqNo, Update, Type, Len, Agent),
globally_ordered(Master, _, _, Update);
|
