Source

python-cloudfiles / cloudfiles / connection.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
"""
connection operations

Connection instances are used to communicate with the remote service at
the account level creating, listing and deleting Containers, and returning
Container instances.

See COPYING for license information.
"""

import  socket
import  os
from    urllib    import urlencode
from    httplib   import HTTPSConnection, HTTPConnection, HTTPException
from    container import Container, ContainerResults
from    utils     import unicode_quote, parse_url, THTTPConnection, THTTPSConnection
from    errors    import ResponseError, NoSuchContainer, ContainerNotEmpty, \
                         InvalidContainerName, CDNNotEnabled, ContainerExists
from    Queue     import Queue, Empty, Full
from    time      import time
import  consts
from    authentication import Authentication
from    fjson     import json_loads
from    sys       import version_info
# Because HTTPResponse objects *have* to have read() called on them
# before they can be used again ...
# pylint: disable-msg=W0612


class Connection(object):
    """
    Manages the connection to the storage system and serves as a factory
    for Container instances.

    @undocumented: cdn_connect
    @undocumented: http_connect
    @undocumented: cdn_request
    @undocumented: make_request
    @undocumented: _check_container_name
    """

    def __init__(self, username=None, api_key=None, timeout=5, **kwargs):
        """
        Accepts keyword arguments for Mosso username and api key.
        Optionally, you can omit these keywords and supply an
        Authentication object using the auth keyword. Setting the argument
        servicenet to True will make use of Rackspace servicenet network.

        @type username: str
        @param username: a Mosso username
        @type api_key: str
        @param api_key: a Mosso API key
        @type servicenet: bool
        @param servicenet: Use Rackspace servicenet to access Cloud Files.
        @type cdn_log_retention: bool
        @param cdn_log_retention: set logs retention for this cdn enabled
        container.
        """
        self.cdn_enabled = False
        self.cdn_args = None
        self.connection_args = None
        self.cdn_connection = None
        self.connection = None
        self.token = None
        self.debuglevel = int(kwargs.get('debuglevel', 0))
        self.servicenet = kwargs.get('servicenet', False)
        self.user_agent = kwargs.get('useragent', consts.user_agent)
        self.timeout = timeout

        # if the environement variable RACKSPACE_SERVICENET is set (to
        # anything) it will automatically set servicenet=True
        if not 'servicenet' in kwargs \
                and 'RACKSPACE_SERVICENET' in os.environ:
            self.servicenet = True

        self.auth = 'auth' in kwargs and kwargs['auth'] or None

        if not self.auth:
            authurl = kwargs.get('authurl', consts.us_authurl)
            if username and api_key and authurl:
                self.auth = Authentication(username, api_key, authurl=authurl,
                            useragent=self.user_agent, timeout=self.timeout)
            else:
                raise TypeError("Incorrect or invalid arguments supplied")
        self._authenticate()
    def _authenticate(self):
        """
        Authenticate and setup this instance with the values returned.
        """
        (url, self.cdn_url, self.token) = self.auth.authenticate()
        url = self._set_storage_url(url)
        self.connection_args = parse_url(url)

        if version_info[0] <= 2 and version_info[1] < 6:
            self.conn_class = self.connection_args[3] and THTTPSConnection or \
                                                              THTTPConnection
        else:
            self.conn_class = self.connection_args[3] and HTTPSConnection or \
                                                              HTTPConnection
        self.http_connect()
        if self.cdn_url:
            self.cdn_connect()

    def _set_storage_url(self, url):
        if self.servicenet:
            return "https://snet-%s" % url.replace("https://", "")
        return url

    def cdn_connect(self):
        """
        Setup the http connection instance for the CDN service.
        """
        (host, port, cdn_uri, is_ssl) = parse_url(self.cdn_url)
        self.cdn_connection = self.conn_class(host, port, timeout=self.timeout)
        self.cdn_enabled = True

    def http_connect(self):
        """
        Setup the http connection instance.
        """
        (host, port, self.uri, is_ssl) = self.connection_args
        self.connection = self.conn_class(host, port=port, \
                                              timeout=self.timeout)
        self.connection.set_debuglevel(self.debuglevel)

    def cdn_request(self, method, path=[], data='', hdrs=None):
        """
        Given a method (i.e. GET, PUT, POST, etc), a path, data, header and
        metadata dicts, performs an http request against the CDN service.
        """
        if not self.cdn_enabled:
            raise CDNNotEnabled()

        path = '/%s/%s' % \
                 (self.uri.rstrip('/'), '/'.join([unicode_quote(i) for i in path]))
        headers = {'Content-Length': str(len(data)),
                   'User-Agent': self.user_agent,
                   'X-Auth-Token': self.token}
        if isinstance(hdrs, dict):
            headers.update(hdrs)

        def retry_request():
            '''Re-connect and re-try a failed request once'''
            self.cdn_connect()
            self.cdn_connection.request(method, path, data, headers)
            return self.cdn_connection.getresponse()

        try:
            self.cdn_connection.request(method, path, data, headers)
            response = self.cdn_connection.getresponse()
        except (socket.error, IOError, HTTPException):
            response = retry_request()
        if response.status == 401:
            self._authenticate()
            headers['X-Auth-Token'] = self.token
            response = retry_request()

        return response

    def make_request(self, method, path=[], data='', hdrs=None, parms=None):
        """
        Given a method (i.e. GET, PUT, POST, etc), a path, data, header and
        metadata dicts, and an optional dictionary of query parameters,
        performs an http request.
        """
        path = '/%s/%s' % \
                 (self.uri.rstrip('/'), '/'.join([unicode_quote(i) for i in path]))

        if isinstance(parms, dict) and parms:
            path = '%s?%s' % (path, urlencode(parms))

        headers = {'Content-Length': str(len(data)),
                   'User-Agent': self.user_agent,
                   'X-Auth-Token': self.token}
        isinstance(hdrs, dict) and headers.update(hdrs)

        def retry_request():
            '''Re-connect and re-try a failed request once'''
            self.http_connect()
            self.connection.request(method, path, data, headers)
            return self.connection.getresponse()

        try:
            self.connection.request(method, path, data, headers)
            response = self.connection.getresponse()
        except (socket.error, IOError, HTTPException):
            response = retry_request()
        if response.status == 401:
            self._authenticate()
            headers['X-Auth-Token'] = self.token
            response = retry_request()

        return response

    def get_info(self):
        """
        Return tuple for number of containers, total bytes in the account and account metadata

        >>> connection.get_info()
        (5, 2309749)

        @rtype: tuple
        @return: a tuple containing the number of containers, total bytes
                 used by the account and a dictionary containing account metadata
        """
        response = self.make_request('HEAD')
        count = size = None
        metadata = {}
        for hdr in response.getheaders():
            if hdr[0].lower() == 'x-account-container-count':
                try:
                    count = int(hdr[1])
                except ValueError:
                    count = 0
            if hdr[0].lower() == 'x-account-bytes-used':
                try:
                    size = int(hdr[1])
                except ValueError:
                    size = 0
            if hdr[0].lower().startswith('x-account-meta-'):
                metadata[hdr[0].lower()[15:]] = hdr[1]
        buff = response.read()
        if (response.status < 200) or (response.status > 299):
            raise ResponseError(response.status, response.reason)
        return (count, size, metadata)

    def update_account_metadata(self, metadata):
        """
        Update account metadata
        >>> metadata = {'x-account-meta-foo' : 'bar'}
        >>> connection.update_account_metadata(metadata)

        @param metadata: Dictionary of metadata
        @type metdada: dict
        """
        response = self.make_request('POST', hdrs=metadata)
        response.read()
        if (response.status < 200) or (response.status > 299):
           raise ResponseError(response.status, response.reason)

    def _check_container_name(self, container_name):
        if not container_name or \
                '/' in container_name or \
                len(container_name) > consts.container_name_limit:
            raise InvalidContainerName(container_name)

    def create_container(self, container_name, error_on_existing=False):
        """
        Given a container name, returns a L{Container} item, creating a new
        Container if one does not already exist.

        >>> connection.create_container('new_container')
        <cloudfiles.container.Container object at 0xb77d628c>

        @param container_name: name of the container to create
        @type container_name: str
        @param error_on_existing: raise ContainerExists if container already
        exists
        @type error_on_existing: bool
        @rtype: L{Container}
        @return: an object representing the newly created container
        """
        self._check_container_name(container_name)

        response = self.make_request('PUT', [container_name])
        buff = response.read()
        if (response.status < 200) or (response.status > 299):
            raise ResponseError(response.status, response.reason)
        if error_on_existing and (response.status == 202):
            raise ContainerExists(container_name)
        return Container(self, container_name)

    def delete_container(self, container_name):
        """
        Given a container name, delete it.

        >>> connection.delete_container('old_container')

        @param container_name: name of the container to delete
        @type container_name: str
        """
        if isinstance(container_name, Container):
            container_name = container_name.name
        self._check_container_name(container_name)

        response = self.make_request('DELETE', [container_name])
        response.read()

        if (response.status == 409):
            raise ContainerNotEmpty(container_name)
        elif (response.status == 404):
            raise NoSuchContainer
        elif (response.status < 200) or (response.status > 299):
            raise ResponseError(response.status, response.reason)

        if self.cdn_enabled:
            response = self.cdn_request('POST', [container_name],
                                hdrs={'X-CDN-Enabled': 'False'})

    def get_all_containers(self, limit=None, marker=None, **parms):
        """
        Returns a Container item result set.

        >>> connection.get_all_containers()
        ContainerResults: 4 containers
        >>> print ', '.join([container.name for container in
                             connection.get_all_containers()])
        new_container, old_container, pictures, music

        @rtype: L{ContainerResults}
        @return: an iterable set of objects representing all containers on the
                 account
        @param limit: number of results to return, up to 10,000
        @type limit: int
        @param marker: return only results whose name is greater than "marker"
        @type marker: str
        """
        if limit:
            parms['limit'] = limit
        if marker:
            parms['marker'] = marker
        return ContainerResults(self, self.list_containers_info(**parms))

    def get_container(self, container_name):
        """
        Return a single Container item for the given Container.

        >>> connection.get_container('old_container')
        <cloudfiles.container.Container object at 0xb77d628c>
        >>> container = connection.get_container('old_container')
        >>> container.size_used
        23074

        @param container_name: name of the container to create
        @type container_name: str
        @rtype: L{Container}
        @return: an object representing the container
        """
        self._check_container_name(container_name)

        response = self.make_request('HEAD', [container_name])
        count = size = None
        metadata = {}
        for hdr in response.getheaders():
            if hdr[0].lower() == 'x-container-object-count':
                try:
                    count = int(hdr[1])
                except ValueError:
                    count = 0
            if hdr[0].lower() == 'x-container-bytes-used':
                try:
                    size = int(hdr[1])
                except ValueError:
                    size = 0
            if hdr[0].lower().startswith('x-container-meta-'):
                metadata[hdr[0].lower()[17:]] = hdr[1]
        buff = response.read()
        if response.status == 404:
            raise NoSuchContainer(container_name)
        if (response.status < 200) or (response.status > 299):
            raise ResponseError(response.status, response.reason)
        return Container(self, container_name, count, size, metadata)

    def list_public_containers(self):
        """
        Returns a list of containers that have been published to the CDN.

        >>> connection.list_public_containers()
        ['container1', 'container2', 'container3']

        @rtype: list(str)
        @return: a list of all CDN-enabled container names as strings
        """
        response = self.cdn_request('GET', [''])
        if (response.status < 200) or (response.status > 299):
            buff = response.read()
            raise ResponseError(response.status, response.reason)
        return response.read().splitlines()

    def list_containers_info(self, limit=None, marker=None, **parms):
        """
        Returns a list of Containers, including object count and size.

        >>> connection.list_containers_info()
        [{u'count': 510, u'bytes': 2081717, u'name': u'new_container'},
         {u'count': 12, u'bytes': 23074, u'name': u'old_container'},
         {u'count': 0, u'bytes': 0, u'name': u'container1'},
         {u'count': 0, u'bytes': 0, u'name': u'container2'},
         {u'count': 0, u'bytes': 0, u'name': u'container3'},
         {u'count': 3, u'bytes': 2306, u'name': u'test'}]

        @rtype: list({"name":"...", "count":..., "bytes":...})
        @return: a list of all container info as dictionaries with the
                 keys "name", "count", and "bytes"
        @param limit: number of results to return, up to 10,000
        @type limit: int
        @param marker: return only results whose name is greater than "marker"
        @type marker: str
        """
        if limit:
            parms['limit'] = limit
        if marker:
            parms['marker'] = marker
        parms['format'] = 'json'
        response = self.make_request('GET', [''], parms=parms)
        if (response.status < 200) or (response.status > 299):
            buff = response.read()
            raise ResponseError(response.status, response.reason)
        return json_loads(response.read())

    def list_containers(self, limit=None, marker=None, **parms):
        """
        Returns a list of Containers.

        >>> connection.list_containers()
        ['new_container',
         'old_container',
         'container1',
         'container2',
         'container3',
         'test']

        @rtype: list(str)
        @return: a list of all containers names as strings
        @param limit: number of results to return, up to 10,000
        @type limit: int
        @param marker: return only results whose name is greater than "marker"
        @type marker: str
        """
        if limit:
            parms['limit'] = limit
        if marker:
            parms['marker'] = marker
        response = self.make_request('GET', [''], parms=parms)
        if (response.status < 200) or (response.status > 299):
            buff = response.read()
            raise ResponseError(response.status, response.reason)
        return response.read().splitlines()

    def __getitem__(self, key):
        """
        Container objects can be grabbed from a connection using index
        syntax.

        >>> container = conn['old_container']
        >>> container.size_used
        23074

        @rtype: L{Container}
        @return: an object representing the container
        """
        return self.get_container(key)


class ConnectionPool(Queue):
    """
    A thread-safe connection pool object.

    This component isn't required when using the cloudfiles library, but it may
    be useful when building threaded applications.
    """

    def __init__(self, username=None, api_key=None, **kwargs):
        poolsize = kwargs.pop('poolsize', 10)
        self.connargs = {'username': username, 'api_key': api_key}
        self.connargs.update(kwargs)
        Queue.__init__(self, poolsize)

    def get(self):
        """
        Return a cloudfiles connection object.

        @rtype: L{Connection}
        @return: a cloudfiles connection object
        """
        try:
            (create, connobj) = Queue.get(self, block=0)
        except Empty:
            connobj = Connection(**self.connargs)
        return connobj

    def put(self, connobj):
        """
        Place a cloudfiles connection object back into the pool.

        @param connobj: a cloudfiles connection object
        @type connobj: L{Connection}
        """
        try:
            Queue.put(self, (time(), connobj), block=0)
        except Full:
            del connobj
# vim:set ai sw=4 ts=4 tw=0 expandtab: