Commits

Yoshifumi YAMAGUCHI  committed 9af91e4

reading manager.Manager.insert

  • Participants
  • Parent commits 5a63792
  • Branches develop

Comments (0)

Files changed (5)

File notes/source/conf.py

 # The name of an image file (within the static path) to use as favicon of the
 # docs.  This file should be a Windows icon file (.ico) being 16x16 or 32x32
 # pixels large.
-#html_favicon = None
+html_favicon = "img/favicon.ico"
 
 # Add any paths that contain custom static files (such as style sheets) here,
 # relative to this directory. They are copied after the builtin static files,

File notes/source/img/favicon.ico

Added
New image

File notes/source/pyhs/manager.rst

 
    HandlerSocketの高レベルなクライアントクラス。
 
+.. py:attribute:: read_socket
+
+   :py:class:`sockets.ReadSocket` のインスタンス
+
+.. py:attribute:: write_socket
+
+   :py:class:`sockets.WriteSocket` のインスタンス
 
 .. py:method:: __init__
 
    :param write_servers: HandlerSocketの書き込みインスタンスを定義する
                          タプルのリスト
 
-   - :meth:`sockets.ReadSocket` 呼び出し
-   - :meth:`sockets.WriteSocket` 呼び出し
+   - :py:class:`ReadSocket` の作成
+   - :py:class:`WriteSocket` の作成
+
+   **実装**
 
    .. code-block:: python
 
           write_servers = write_servers or [('inet', 'localhost', 9999)]
           self.read_socket = sockets.ReadSocket(read_servers, debug)
           self.write_socket = sockets.WriteSocket(write_servers, debug)
+
+   **呼び出し**
+   
+   - :py:meth:`sockets.HandlerSocket.__init__`
+
+
+.. py:method:: insert
+
+   **概要**
+
+   :param string db: データベース名
+   :param string table: テーブル名
+   :param fields: ``table`` に入れる (カラム名, 値) のペアのリスト
+   :type fields: list of tuple
+   :param index_name: インデックスの名前。デフォルトは ``PRIMARY``
+   :type index_name: string or None
+   :rtype: bool
+
+
+   **実装**
+
+   .. code-block:: python
+
+      @retry_on_failure
+      def insert(self, db, table, fields, index_name=None):
+          keys, values = zip(*fields)
+          index_id = self.write_socket.get_index_id(db, table, keys, index_name)
+          data = self.write_socket.insert(index_id, values)
+  
+          return data
+  
+
+   **呼び出し**
+
+   - :py:meth:`utils.retry_on_failure`
+   - :py:meth:`sockets.WriteSocket.get_index_id`
+   - :py:meth:`sockets.WriteSocket.insert`

File notes/source/pyhs/sockets.rst

    - 読み込み用のHandlerSocketクライアント
    - :py:class:`HandlerSocket` クラスを継承
 
+   **呼び出し元**
 
+   - :py:class:`manager.Manager`
+
+   **継承先**
+
+   - :py:class:`HandlerSocket`
+
+
+WriteSocket
+===========
+
+.. py:class:: WriteSocket
+
+   **概要**
+   
+   - 読み込み用のHandlerSocketクライアント
+   - :py:class:`HandlerSocket` クラスを継承
+
+   **呼び出し元**
+
+   - :py:class:`manager.Manager`
+
+   **継承先**
+
+   - :py:class:`HandlerSocket`
+
+
+   .. py:method:: get_index_id
+
+      **呼び出し元**
+
+      - :py:meth:`manager.Manager.insert`
+
+
+   .. py:method:: insert
+
+      **概要**
+
+      :param integer index_id: 開いたインデックスのid
+      :param list columns: 挿入するカラムの値のリスト。
+                           カラムが定義された順に値を入れないとだめ。
+      :rtype: bool
+
+      - ``columns`` が空でないiterableか確認する
+
+      **実装**
+
+      .. code-block:: python
+
+         def insert(self, index_id, columns):
+             if not check_columns(columns):
+                 raise ValueError('Columns must be a non-empty iterable.')
+     
+             query = chain(
+                 (str(index_id), '+', str(len(columns))),
+                 imap(encode, columns)
+             )
+     
+             self._call(index_id, query, force_index=True)
+     
+             return True
+  
+
+     **呼び出し**
+
+      - :py:func:`utils.check_columns`
+      - :py:meth:`HandlerSocket._call`
+
+      **呼び出し元**
+
+      - :py:meth:`manager.Manager.insert`
+
+
+   
 
 HandlerSocket
 =============
    .. py:attribute:: connections
    
       :py:class:`Connection` のインスタンスを持ったリスト
-   
+
+   .. py:attribute:: index_map
+
+   .. py:attribute:: current_index_id
+
+   .. py:attribute:: index_cache
+
+   .. py:attribute:: last_connection_exception
+
    .. py:method:: __init__
    
       :param iterable servers: サーバデータを定義するリスト
    
       **実装**
    
-      .. code-blocK:: python
+      .. code-block:: python
    
          def __init__(self, servers, debug=False):
              self.connections = []
       - :py:meth:`Connection.__init__`
       - :py:meth:`Connection.set_debug_mode`
       - :py:meth:`_clear_caches`
+
+      **継承先**
+
+      - :py:class:`ReadSocket`
+
+
+
+   .. py:method:: _clear_chaches
+
+      **概要**
+
+      - 各種属性を初期化
+
+      **実装**
+
+      .. code-block:: python
+
+         def _clear_caches(self):
+           self.index_map = {}
+           self.current_index_id = 0
+           self.index_cache = {}
+           self.last_connection_exception = None
    
+         
+      **呼び出し元**
+
+      - :py:meth:`__init__`
+
+   .. py:method:: _get_connection
+   
+      **概要**
+
+      :param index_id: コネクションを張るインデックスid
+      :type index_id: integer or None
+      :param bool force_index: ``True`` であれば ``index_id`` を開けるために使われたコネクションのみを返す
+      :rtype: :py:class:`Connection` インスタンス
+
+	  - コネクションプールからアクティブなコネクションを持ってくる
+	  - コネクションに失敗したらリトライして持ってくる
+	  - サーバ全部に対してコネクション失敗したら例外を返す
+	  - ``force_index`` が設定されていたら ``index_id`` に対してのみコネクションしてみる
+
+      **実装**
+
+      .. code-block:: python
+
+          def _get_connection(self, index_id=None, force_index=False):
+              connections = self.connections[:]
+              random.shuffle(connections)
+              # Try looking up for index_id in index_map - we should use same connections
+              # for opened indexes and operations using them
+              if index_id is not None and index_id in self.index_map:
+                  conn = self.index_map[index_id]
+              else:
+                  if force_index:
+                      raise OperationalError('There is no connection with given index id "%d"' % index_id)
+                  conn = connections.pop()
+      
+              exception = lambda exc: ConnectionError('Could not connect to any of given servers: %s'\
+                                        % exc.args[0])
+              # Retry until either limit is reached or all connections tried
+              for i in range(max(self.RETRY_LIMIT, len(connections))):
+                  try:
+                      if conn.is_ready():
+                          conn.connect()
+                          break
+                  except ConnectionError, e:
+                      self.last_connection_exception = e
+                      # In case indexed connection is forced remove it from the caches
+                      # and raise exception so higher level code could retry whole operation
+                      if force_index:
+                          self.purge_index(index_id)
+                          if connections:
+                              raise RecoverableConnectionError('Could not use connection with given index id "%d"' % index_id)
+                          else:
+                              # No point retrying if no more connections are available
+                              raise exception(self.last_connection_exception)
+                  if connections:
+                      conn = connections.pop()
+              else:
+                  raise exception(self.last_connection_exception)
+      
+              # If we have an index id, save a relation between it and a connection
+              if index_id is not None:
+                  self.index_map[index_id] = conn
+              return conn
+
+      **呼び出し**
+
+      - :py:data:`connections`
+      - :py:data:`index_map`
+      - :py:meth:`Connection.is_ready`
+      - :py:meth:`Connection.connect`
+      - :py:meth:`purge_index`
+
+      **呼び出し元**
+
+      - :py:meth:`_call`
+
+
+   .. py:method:: _call
+
+      **概要**
+
+      :param integer index_id: 操作を行なうid
+      :param iterable query: サーバに送信するlist/iterableのトークン
+      :param bool force_index: 操作に ``index_id`` が必要な場合は ``True``
+                               インデックスを開く時以外は必要。
+                               :py:meth:`_get_connection` 参照
+      :rtype: リスト
+
+	  - HandlerSocketサーバとのデータのやり取りを実際に行なうヘルパー関数
+
+	  **実装**
+
+      .. code-block:: python
+
+         def _call(self, index_id, query, force_index=False):
+             conn = self._get_connection(index_id, force_index)
+             try:
+                 conn.send('\t'.join(query)+'\n')
+                 response = self._parse_response(conn.readline())
+             except ConnectionError, e:
+                 self.purge_index(index_id)
+                 raise e
+     
+             return response
+     
+      **呼び出し**
+
+      - :py:meth:`_get_connection`
+      - :py:meth:`purge_index`
+
+	  **呼び出し元**
+
+	  - :py:meth:`WriteSocket.insert`
+
 
 Connection
 ==========
 
    .. py:attribute:: address
 
-      :py:meth:`__init__` で初期化される。プロトコルが :py:data:`UNIX_PROTO` の場合に
-      :py:data:`host` が入り、 :py:data:`INET_PROTO` の場合に :py:data:`host` と
-      :py:data:`port` のタプルが入る。
+      - :py:meth:`__init__` で初期化される。
+        プロトコルが :py:data:`UNIX_PROTO` の場合に :py:data:`host` が入り、
+        :py:data:`INET_PROTO` の場合に :py:data:`host` と :py:data:`port` のタプルが入る。
 
    .. py:attribute:: socket
 
-      :py:meth:`__init__` で ``None`` に初期化される。
+      - :py:meth:`__init__` で ``None`` に初期化される。
 
    .. py:attribute:: retry_time
 
-      :py:meth:`__init__` で 0 に初期化される。
+      - :py:meth:`__init__` で 0 に初期化される。
+      - :py:meth:`_die` で新しい値を設定される
+      - :py:meth:`is_ready` でリトライ時間を過ぎているかに使われる。
 
    .. py:attribute:: debug
 
-      :py:meth:`__init__` で ``False`` に初期化される。
+      - :py:meth:`__init__` で ``False`` に初期化される。
+      - :py:meth:`set_debug_mode` で設定する
    
    .. py:method:: __init__
    
 
       :param bool mode: デバッグモード
 
-      - :py:data:`debug` を切り替える
+      - :py:attr:`debug` を切り替える
 
       **実装**
 
       **呼び出し元**
 
       - :py:meth:`HandlerSocket.__init__`
+
+
+   .. py:method:: is_ready
+
+      **概要**
+
+      :rtype: bool
+
+      - コネクションインスタンスが準備できているか確認する
+	  - :py:attr:`retry_time` が未来なら準備できていないと判断する
+
+      **実装**
+
+      .. code-block:: python
+
+         def is_ready(self):
+             if self.retry_time and self.retry_time > time.time():
+                 return False
+             self.retry_time = 0
+             return True
+     
+      **呼び出し元**
+
+      - :py:meth:`HandlerSocket._get_connection`
+
+
+   .. py:method:: connect
+
+      **概要**
+
+      - 新しいソケットでコネクションを確立する
+	  - 既存のコネクションがあったら何もしない
+	  - `Nagle algrithm <http://developers.slashdot.org/comments.pl?sid=174457&threshold=1&commentsort=0&mode=thread&cid=14515105>`_ というのを殺しているらしい
+      - :py:attr:`socket` がすでにあればなにもしないで返す
+      - ソケットを :py:attr:`protocol` でTCP接続し、タイムアウトを設定。
+        それを :py:attr:`socket` に代入。
+      - 接続時にエラーがあったら :py:meth:`_die` を呼ぶ。
+
+      **実装**
+
+      .. code-block:: python
+
+         def connect(self):
+             if self.socket:
+                 return
+     
+             try:
+                 sock = socket.socket(self.protocol, socket.SOCK_STREAM)
+                 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+                 sock.settimeout(self.timeout)
+                 sock.connect(self.address)
+             except socket.error, e:
+                 self._die(e, 'Connection error')
+     
+             self.socket = sock
+     
+      **呼び出し**
+
+      - :py:meth:`_die`
+
+      **呼び出し元**
+
+      - :py:meth:`HandlerSocket._get_connection`
+
+
+   .. py:method:: _die
+
+      **概要**
+
+      :param e: コネクション失敗の時に発生した例外
+      :type e: :exc:`socket.error`
+      :param msg: 追加の例外メッセージ
+      :type msg: string or None
+
+      - ホストから切断しリトライの時間を設定する
+
+      **実装**
+
+      .. code-block:: python
+
+         def _die(self, e, msg='Socket error'):
+             self.retry_time = time.time() + self.RETRY_INTERVAL
+             self.disconnect()
+     
+             exmsg = len(e.args) == 1 and e.args[0] or e.args[1]
+             raise ConnectionError("%s: %s" % (msg, exmsg))
+
+      **呼び出し**
+
+      - :py:attr:`retry_time`
+      - :py:meth:`disconnect`
+
+      **呼び出し元**
+
+      - :py:meth:`connect`

File notes/source/pyhs/utils.rst

 =======
  utils
 =======
+
+.. py:module:: utils
+
+.. py:function:: check_columns
+
+   **概要**
+
+   :param columns: チェックする値
+   :type columns: iterable
+   :rtype: bool
+
+   - 単純に引数が空でないiterableか確認するだけ
+
+   **実装**
+
+   .. code-block:: python
+
+      def check_coumns(columns):
+          if not hasattr(columns, '__iter__') or not len(columns):
+              return False
+          return True
+    
+   **呼び出し元**
+
+   - :py:meth:`sockets.WriteSocket.insert`
+