cheesecake-service / commands_stream.py

import os
import socket

from config import COMMUNICATION_SOCKET_PATH

class CommandsStream(object):
    def __init__(self):
        if os.path.exists(COMMUNICATION_SOCKET_PATH):
            os.remove(COMMUNICATION_SOCKET_PATH)

        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        self.sock.bind(COMMUNICATION_SOCKET_PATH)

        # Make socket non-blocking.
        self.sock.setblocking(False)

    def get_releases_list(self):
        """Return list of releases send over a socket, each release
        in a form of a tuple (package, version).
        """
        try:
            data = self.sock.recv(1024)
            return map(lambda line: tuple(line.split('==')),
                       data.splitlines())
        except socket.error:
            return []

    def close(self):
        self.sock.close()
        os.remove(COMMUNICATION_SOCKET_PATH)


def send_releases_list(releases):
    """Send releases list to a socket.

    Each release will be send in separate line, in format of
    "package==version".
    """
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    sock.connect(COMMUNICATION_SOCKET_PATH)
    sock.send("\n".join(releases))
    sock.close()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.