Source

twitter-feed / feed.py

#! /usr/bin/env python3
# -*- coding: utf-8 -*-

"""
A twitter client

based on https://dev.twitter.com/docs/api/1/get/search

Copyright (c) 2013, IIHM/LIG - Renaud Blanch <http://iihm.imag.fr/blanch/>
Licence: GPLv3 or higher <http://www.gnu.org/licenses/gpl.html>
"""


# imports ###################################################################

import sys
import getopt
import time
import calendar

from textwrap import dedent

from urllib.request import urlopen
from urllib.parse import urlencode

import json


# handling of broken pipe ####################################################

import signal
signal.signal(signal.SIGPIPE, signal.SIG_DFL)


# twitter query ##############################################################

TWITTER_URL = "http://search.twitter.com/search.json"
TWITTER_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z"

DATE_FORMAT = "%Y-%m-%d %H:%M"
TWEET_FORMAT = "[%(created_at)s] %(from_user)s: %(text)s\n"

def twitter_date(t, date_format=DATE_FORMAT, twitter_date_format=TWITTER_DATE_FORMAT):
	t = time.strptime(t, twitter_date_format)
	t = calendar.timegm(t)
	t = time.localtime(t)
	t = time.strftime(date_format, t)
	return t

def get(url="", data=""):
	search = urlopen("".join([TWITTER_URL, url]), data.encode('utf-8'))
	info = search.info()
	assert info['Content-type'] == "application/json;charset=utf-8"
	text_response = search.read().decode('utf-8')
	return json.loads(text_response)

def query_twitter(**kwargs):
	content = get(data=urlencode(kwargs))
	
	results = content['results']
	while 'next_page' in content and not 'rpp' in kwargs:
		next_page = content['next_page']
		content = get(next_page)
		results += content['results']
	
	for tweet in reversed(results):
		tweet['created_at'] = twitter_date(tweet['created_at'])
		sys.stdout.write(TWEET_FORMAT % tweet)
	sys.stdout.flush()

	return content['max_id']


# main #######################################################################

DEFAULT_N     = 15
DEFAULT_DELAY =  1
DEFAULT_TYPE  = "recent"

def exit_usage(name, message=None, code=0):
	usage = dedent("""\
	Usage: %s [-hn:d:t:] <t0 ...>
		-h --help       this help message
		-n --number <n> retrieve at most n tweets in the past (defaults to %s)
		-d --delay  <s> time between pollings (defaults to %ss)
		-t --type   "recent"|"mixed"|"popular"  type of tweets (defaults to '%s')
		<t0>, ...   query terms
	""")
	if message:
		sys.stderr.write("%s\n" % message)
	sys.stderr.write(usage % (name, DEFAULT_N, DEFAULT_DELAY, DEFAULT_TYPE))
	sys.exit(code)


def main(argv=[__name__]):
	name, *args = argv
	
	try:
		options, args = getopt.getopt(args, "hn:d:t:", ["help", "number=", "delay=", "type="])
	except getopt.GetoptError as message:
		exit_usage(name, message, 1)
	
	if len(args) < 1:
		exit_usage(name, "at least one argument is expected", 1)
	
	n           = DEFAULT_N
	result_type = DEFAULT_TYPE
	delay       = DEFAULT_DELAY
	
	for opt, value in options:
		if opt in ["-h", "--help"]:
			exit_usage(name)
		elif opt in ["-n", "--number"]:
			n = int(value)
		elif opt in ["-t" "--type"]:
			result_type = value
		elif opt in ["-d", "--delay"]:
			delay = int(value)
	
	query = {
		"rpp":         n,
		"result_type": result_type,
		"q":           " ".join(args),
	}
	
	while True:
		try:
			max_id = query_twitter(**query)
			query['since_id'] = max_id
			query.pop('rpp', None)
			time.sleep(delay)
		except KeyboardInterrupt:
			break


if __name__ == "__main__":
	sys.exit(main(sys.argv))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.