Commits

Atte Kojo committed 5766bb7

Moved the continuum into its own module.

  • Participants
  • Parent commits 7377159

Comments (0)

Files changed (1)

File memcached.ml

 
 let ws = regexp "[ \t\r\n]+"
 
+(* Value type signature required by the interface *)
 module type Value = sig
     type t
     val to_string: t -> string
     val of_string: string -> t
 end
 
+(* Cache type signature required by the interface *)
 module type S = sig
     type +'a t
     type value
     val stats: 'a t -> (string * int) -> (string * string) list
 end
 
-module Memcached_impl (Value : sig
-    type 'a t
-    val to_string: 'a t -> string
-    val of_string: string -> 'a t
-end) = struct
-    
-    module ConnectionKey = struct
+(* The data structure holding the key-server mapping, i.e. the continuum. Uses
+ * consistent hashing to map keys to servers to minimize the effect of
+ * adding/removing servers. *)
+module Cnt = struct
+    module ConnKey = struct
         type t = string * int
         let compare = Pervasives.compare
     end
-    module ConnMap = Map.Make(ConnectionKey)
+    module ConnMap = Map.Make(ConnKey)
 
-    type connection = {
-        input: in_channel;
-        output: out_channel;
-    }
     type 'a t = {
-        connections: connection ConnMap.t;
-        continuum: (int * connection) array;
+        nservers: int;
+        connections: 'a ConnMap.t;
+        continuum: (int * 'a) array;
     }
 
-    (* Functions for handling the key-to-server mapping, i.e the continuum *)
-
-    let nservers = 200
-
-    let search cmp v ary = 
-        let len = Array.length ary in
-        let rec binsearch v ary first last =
-            if last <= first then
-                match cmp v ary.(first) with
-                | 1 -> if first == len - 1 then 0 else first + 1
-                | _ -> first
-                else
-                    let mid = first + (last - first) / 2 in
-                    match cmp v ary.(mid) with
-                    | 1 -> binsearch v ary (mid + 1) last
-                    | -1 -> binsearch v ary first (mid - 1)
-                    | _ -> mid in
-        binsearch v ary 0 (len - 1)
-
-    let cont_find key continuum =
-        let hash = mm_hash2 key in
-        let cmp v h2 = compare v (fst h2) in
-        let idx = search cmp hash continuum in
-        snd continuum.(idx)
-
-    let cont_create conn_map =
+    (* Internal functions *)
+    let create map nservers =
         let rec gen_hashes str count =
             match count with
             | 0 -> []
             let host = name ^ string_of_int port in
             List.map (fun n -> (n, conn)) (gen_hashes host nservers) in
         let cmp h1 h2 = compare (fst h1) (fst h2) in
-        let conns = ConnMap.fold (fun k v a -> gen_conns k v @ a) conn_map [] in
+        let conns = ConnMap.fold (fun k v a -> gen_conns k v @ a) map [] in
         Array.of_list (List.sort cmp conns)
 
+    let search cmp v ary = 
+        let len = Array.length ary in
+        let rec binsearch v ary first last =
+            if last <= first then
+                match cmp v ary.(first) with
+                | 1 -> if first == len - 1 then 0 else first + 1
+                | _ -> first
+            else
+                let mid = first + (last - first) / 2 in
+                match cmp v ary.(mid) with
+                | 1 -> binsearch v ary (mid + 1) last
+                | -1 -> binsearch v ary first (mid - 1)
+                | _ -> mid in
+        binsearch v ary 0 (len - 1)
+
+    (* Public interface *)
+    let empty n =
+        {
+            nservers = n;
+            connections = ConnMap.empty;
+            continuum = [||];
+        }
+
+    let add host connection c =
+        let new_connections = ConnMap.add host connection c.connections in
+        {
+            nservers = c.nservers;
+            connections = new_connections;
+            continuum = create new_connections c.nservers
+        }
+
+    let remove host c =
+        let new_connections = ConnMap.remove host c.connections in
+        {
+            nservers = c.nservers;
+            connections = new_connections;
+            continuum = create new_connections c.nservers
+        }
+
+    let connection_for key c =
+        if (ConnMap.is_empty c.connections) then
+            failwith "No servers"
+        else
+            let hash = mm_hash2 key in
+            let cmp v h2 = compare v (fst h2) in
+            let idx = search cmp hash c.continuum in
+            snd c.continuum.(idx)
+
+    let find host c =
+        ConnMap.find host c.connections
+end
+
+
+(* The actual implementation of the memcached client protocol *)
+module Memcached_impl (Value : sig
+    type 'a t
+    val to_string: 'a t -> string
+    val of_string: string -> 'a t
+end) = struct
+
+    type connection = {
+        input: in_channel;
+        output: out_channel;
+    }
+
+    type 'a t = connection Cnt.t
+
+    let nservers = 200
+
     (* Internal helper functions for handling communications with the memcached
      * server *)
 
-    let conn_for_key cache key =
-        if (ConnMap.is_empty cache.connections) then
-            failwith "No servers"
-        else
-            cont_find key cache.continuum
-
     let write_line conn line =
         output_string conn.output (line ^ "\r\n");
         flush conn.output
         | None -> []
 
     let store cmd cache expires key data =
-        let conn = conn_for_key cache key in
+        let conn = Cnt.connection_for key cache in
         let datastr = Value.to_string data in
         let len = String.length datastr in
         write_line conn (sprintf "%s %s 0 %d %d" cmd key expires len);
         | _ -> failwith cmd
 
     let arith cmd cache key value =
-        let conn = conn_for_key cache key in
+        let conn = Cnt.connection_for key cache in
         write_line conn (sprintf "%s %s %d" cmd key value);
         match List.hd (read_line conn) with
         | "NOT_FOUND" -> None
 
     (* External interface *)
 
-    let create () =
-        let cache = { connections = ConnMap.empty; continuum = [||] } in
-        cache
+    let create () = Cnt.empty nservers
 
     let connect cache (hostname, port) =
         let h_addr = (gethostbyname hostname).h_addr_list.(0) in
         let (input, output) = open_connection (ADDR_INET(h_addr, port)) in
         let conn = { input = input; output = output } in
         let () = Gc.finalise connection_finalizer conn in
-        let new_conns = ConnMap.add (hostname, port) conn cache.connections in
-        { connections = new_conns; continuum = cont_create new_conns; }
+        Cnt.add (hostname, port) conn cache
 
-    let disconnect cache host =
+    let disconnect cache (hostname, port) =
         (* No need to disconnect here since the finalizer will do it. *)
-        let new_conns = ConnMap.remove host cache.connections in
-        { connections = new_conns; continuum = cont_create new_conns }
+        Cnt.remove (hostname, port) cache
 
     let get cache key =
-        let conn = conn_for_key cache key in
+        let conn = Cnt.connection_for key cache in
         write_line conn ("get " ^ key);
         match (read_list read_value conn) with
         | [] -> None
         store "replace" cache expires key data
 
     let delete cache ?(wait_time = 0) key =
-        let conn = conn_for_key cache key in
+        let conn = Cnt.connection_for key cache in
         write_line conn (sprintf "delete %s %d" key wait_time);
         match read_line conn with
         | ["DELETED"] -> true
     let decr cache key value = arith "decr" cache key value
 
     let stats cache host =
-        let conn = ConnMap.find host cache.connections in
+        let conn = Cnt.find host cache in
         write_line conn "stats";
         read_list read_stat conn
 end