Source

astrosearch / GrabTweets.hs

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-

Usage:

  grabtweets <search term> <interval in seconds>
  grabtweets restart
  
Aim:

Run a twitter search for the search term, saving the results to
search<n>.json. The current search state is saved to search.state
which can be used to re-start the search (by calling the routine with
the argument restart).

The interval refers to the amount of time between searches; no attempt is
made to ensure the value is large enough to avoid hitting the Twitter
rate limit.

The program errors out if started with an interval but the file
   search.state
exists, to avoid accidentally overwriting existing files.

The search term can be something like

  "aas221 or hackaas or aas 221"

-}

module Main where

import qualified Control.Exception as CE

-- import qualified Data.Text.Lazy as T
-- import Data.Text.Encoding as E
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL

-- import Network.HTTP.Enumerator (simpleHttp)
import Network.HTTP.Conduit (simpleHttp)

import Control.Concurrent (killThread, threadDelay, forkIO)
import Control.Concurrent.MVar
import Control.Monad (forM_, join, when)
import Control.Applicative ((<$>), (<*>))

import Data.Aeson
-- import qualified Data.Attoparsec.Lazy as L

import qualified Data.Vector as V
-- import qualified Data.Map as M
import qualified Data.HashMap.Strict as HM
-- import qualified Data.HashMap.Lazy as HM

import Data.Time.Clock (UTCTime, getCurrentTime, diffUTCTime, addUTCTime)

import System.Environment (getArgs, getProgName)
import System.Exit (exitFailure)
import System.IO (stdout, stderr, hPutStrLn, hFlush)
import System.Directory (removeFile, doesFileExist)

import Utils (maybeRead)
import Paths_astrosearch (version)
import Data.Version (showVersion)

{-
Since Int has an upper bound there is a limit, which
is 2147 seconds plus change on OS-X.
-}

maxDelay :: Int
maxDelay = floor $ fromIntegral (maxBound :: Int) / (1.0e6::Float)

waitBounded :: Int -> IO ()
waitBounded = threadDelay . (1000000 *)
  
-- do not need to use Integer here for this script ;-)
wait :: Int -> IO ()
wait n | n <= maxDelay = waitBounded n
       | otherwise = waitBounded maxDelay >> 
                     wait (n - maxDelay)
       
{-  
Return the action's value or, if it takes 
longer than timeout seconds, Nothing.
-}
timeOut :: 
  Int -- ^ timeout in seconds 
  -> IO a -- ^ action
  -> IO (Maybe a) -- ^ action's result or Nothing if it timed out
timeOut n act = do
  mv <- newEmptyMVar
  tid <- forkIO $ act >>= putMVar mv . Just
  _ <- forkIO $ wait n >> putMVar mv Nothing >> killThread tid >> logError "IO action timed out"
  readMVar mv
  
twBase :: BC.ByteString  
twBase = "http://search.twitter.com/search.json"

runTweetSearch :: BC.ByteString -> IO (Maybe BL.ByteString)
-- runTweetSearch _ = return Nothing -- TESTING
runTweetSearch sURL =
  fmap Just (simpleHttp (BC.unpack sURL))
  `CE.catch` (\e -> logException e >> return Nothing)

searchTweets :: Int -> BC.ByteString -> IO (Maybe BL.ByteString)
searchTweets tout sURL = do
  -- ans <- timeOut tout (runTweetSearch sURL) `CE.catch` (\e -> if e == CE.ThreadKilled then return Nothing else CE.throwIO e)
  ans <- timeOut tout (runTweetSearch sURL)
  return $ join ans

convToJSON :: BL.ByteString -> Maybe Value
{-

With aeson 0.3 I used

convToJSON inp = case L.parse json inp of
  L.Done _ vs -> Just vs
  _ -> Nothing

but with 0.5 I am trying
-}

convToJSON = decode


{-
Given JSON output from a search, return the next URL
to call:

  Right refresh_url  - ie downloaded all data for this search
  Left  next_page    - have more to download

The Int is the number of results found (if any were found).

Note that I add on &rpp=100 to the refresh_url since it looks
like Twitter do not pass this value through from the initial
search.
-}

getNextURL :: BL.ByteString -> Maybe (Int, Either BC.ByteString BC.ByteString)
getNextURL inp = do
  Object jmap <- convToJSON inp
  let fromJArray (Just (Array xs)) = Just xs
      fromJArray _ = Nothing
  -- nr <- fmap V.length $ fromJArray $ M.lookup "results" jmap
  nr <- fmap V.length $ fromJArray $ HM.lookup "results" jmap

  let fj c val = case fmap (c . B.append twBase) (fromJSON val) of
        Success v -> Just (nr, v)
        _ -> Nothing
  
  -- case M.lookup "next_page" jmap of
  case HM.lookup "next_page" jmap of
    Just np -> fj Left np
    -- Nothing -> case M.lookup "refresh_url" jmap of
    Nothing -> case HM.lookup "refresh_url" jmap of
      Just ru -> case fj Right ru of
        -- this is ugly TODO: clean up
        Just (a,Right b) -> Just (a,Right (BC.append b "&rpp=100"))
        _ -> Nothing
        
      _ -> Nothing

{-
Run an IO action, catching any exception that may be thrown
and logging it instead.
-}
catchError :: IO () -> IO ()
catchError act = act `CE.catch` logException
  
data SearchState = 
  SS 
  (BC.ByteString,Int) -- ^ search URL, counter
  String  -- ^ search term
  Int     -- ^ time between searches, in seconds
  UTCTime -- ^ time
  deriving (Eq)
           
instance Show SearchState where
  show (SS (url,ctr) searchterm space time) = 
    unlines
    [ "Search state:"
    , "  Interval     " ++ show space ++ " s"
    , "  Last search  " ++ show time
    , "  Next file    " ++ getOutName ctr
    , "  Search term  \"" ++ searchterm ++ "\""
    , "  URL          " ++ BC.unpack url
    ]
      
toSearchState :: (BC.ByteString,Int) -> String -> Int -> UTCTime -> SearchState
toSearchState = SS

fromSearchState :: SearchState -> ((BC.ByteString,Int),String,Int,UTCTime)
fromSearchState (SS a b c d) = (a,b,c,d)

getInterval :: SearchState -> Int
getInterval (SS _ _ a _) = a

getCounter :: SearchState -> Int
getCounter (SS (_,a) _ _ _) = a

getSearchTerm :: SearchState -> String
getSearchTerm (SS _ st _ _) = st

{-
Return the number of seconds to wait before starting a new search,
or Nothing if the new search can be run now.
-}
getNextSearchTime ::
  SearchState   -- ^ state of the last search
  -> UTCTime    -- ^ current time
  -> Maybe Int  -- ^ how long to wait before the next search (@Nothing@ if search can be run now)
getNextSearchTime (SS _ _ d st) ct = 
  let delta = floor $ diffUTCTime ct st
  in if delta >= d then Nothing else Just $ d - delta

-- delay until it's time for the next search (which may be now)
waitForNextSearch :: SearchState -> IO ()
waitForNextSearch state = do
  cTime <- getCurrentTime
  let md = getNextSearchTime state cTime
  case md of
    Just d -> do
      logMsg $ "Waiting " ++ show d ++ " seconds before next search: " ++ show cTime
      wait d
    _ -> return ()

saveState :: SearchState -> IO ()
saveState (SS (url,ctr) searchterm sleep cTime) = 
  let qs :: (Show a) => a -> BC.ByteString
      qs = BC.pack . show
  in catchError $ B.writeFile "search.state"
     $ B.concat [url, "\n", qs ctr, "\n", BC.pack searchterm, "\n", qs cTime, "\n", qs sleep, "\n"]

readState :: IO (Maybe SearchState)
readState = do
  cts <- BC.readFile "search.state" `CE.catch` (\(_::CE.IOException) -> return "")
  case BC.lines cts of
    -- is there a bytestring-based Read module?
    [url, ctr, searchterm, time, sleep] -> 
      let cVal = maybeRead $ BC.unpack ctr
          sVal = maybeRead $ BC.unpack sleep
          tVal = maybeRead $ BC.unpack time
          hVal = Just $ BC.unpack searchterm
          tpl = (,) <$> return url <*> cVal
      in return $ toSearchState <$> tpl <*> hVal <*> sVal <*> tVal 
    _ -> return Nothing
                 
{-
still not convinced that writing out/passing through the correct URL when
a search returns no results.

First, deal with re-starting from search.state
-}

downloadData :: 
  Int -- ^ timeout in seconds for search to run, otherwise return the input terms
  -> (BC.ByteString, Int)   -- ^ (start URL for search, counter such that the first file will be @search<n>.json@)
  -> IO (BC.ByteString, Int) -- ^ (new URI for search, new counter); will be the same as the input if an error occurred
                     -- or no tweets were found
downloadData tOut s@(surl,_) = do
  logMsg $ "Search URL: " ++ BC.unpack surl
  ans <- getTweets tOut s []
  case ans of
    Right n -> return n
    Left xs -> do
      forM_ xs $ catchError . removeFile . getOutName
      return s
  
getTweets ::  
  Int  -- ^ timeout for the download, in seconds
  -> (BC.ByteString, Int) -- ^ current URL and counter
  -> [Int] -- ^ the counters used during this download
  -> IO (Either [Int] (BC.ByteString, Int))
  -- ^ Returns the list of counters processed before an error
  -- occured (@Left@) or the new url,counter pair on success
  -- (@Right@).
getTweets tOut (sUrl,sCtr) xs = do
  mbs <- searchTweets tOut sUrl
  case mbs of
    Nothing -> return $ Left xs
    Just bs -> 
      case getNextURL bs of 
        Nothing -> return $ Left xs
        Just (numTweets, enu) -> do
          -- only write the file out if there are any results
          -- but we do update the URL based on the contents since
          -- it will (in all likelihood) have changed somewhat
          let fName = getOutName sCtr
              nCtr = sCtr + 1
              hasData = numTweets > 0
          when hasData $ 
            catchError $ BL.writeFile fName bs >> logMsg ("Wrote " ++ show numTweets ++ " tweets to: " ++ fName)
            
          case enu of
            Right nUrl -> return $ Right (nUrl, if hasData then nCtr else sCtr)
            Left  cUrl -> getTweets tOut (cUrl, nCtr) (sCtr:xs)

getOutName :: Int -> FilePath
getOutName c = "search" ++ show c ++ ".json"

-- the start Url
startUrl :: 
    String   -- ^ search term
    -> Int 
    -> BC.ByteString
startUrl searchterm n = 
    B.concat [twBase, "?q=", BC.pack searchterm,
              "&include_entities=true",
              "&rpp=", (BC.pack . show) n]

{-
Write the message to the screen and the file search.log
-}
logMsg :: String -> IO ()
logMsg msg = do
  let txt = "# " ++ msg
  putStrLn txt
  hFlush stdout
  cTime <- getCurrentTime
  appendFile "search.log" $ show cTime ++ " " ++ msg ++ "\n"

logVersion :: IO ()
logVersion = logMsg $ "Running: GrabTweets " ++ showVersion version 

logError :: String -> IO ()
logError = logMsg . ("ERROR: " ++)

logException :: CE.SomeException -> IO ()
logException = logMsg . ("EXCEPTION: " ++) . show

errorOut :: String -> IO ()
errorOut emsg = do
  hPutStrLn stderr $ "Error: " ++ emsg
  hFlush stderr
  cTime <- getCurrentTime
  appendFile "search.log" $ show cTime ++ " ERROR " ++ emsg ++ "\n"
  exitFailure

restartSearch :: IO ()
restartSearch = do
  mstate <- readState
  case mstate of
    Just state -> do
      logVersion
      logMsg $ "Restarting search - interval = " ++ show (getInterval state) ++ " seconds."
      logMsg $ "  search term = \"" ++ getSearchTerm state ++ "\""
      logMsg $ "  current counter = " ++ show (getCounter state)
      runSearch state
          
    _ -> errorOut "Unable to read state from search.state" -- TODO: could do with more info

startSearch :: String -> String -> IO ()
startSearch searchterm arg =
  case maybeRead arg of
    Just step | step <= 0 -> errorOut $ "step must be > 0, sent " ++ show step
              | otherwise -> do
                logVersion
                logMsg $ "Starting search - interval = " ++ show step ++ " seconds."
                logMsg $ "  search term = \"" ++ searchterm ++ "\""
                -- hacky way to get search to start immediately; should refactor
                cTime <- getCurrentTime
                runSearch $ toSearchState (startUrl searchterm 100, 1) searchterm step (addUTCTime (fromIntegral (-step)) cTime)
            
    _ -> errorOut $ "Unable to convert " ++ arg ++ " to an integer."

usage :: IO ()
usage = do
  progName <- getProgName
  hPutStrLn stderr $ "Usage: " ++ progName ++ " <search term> <delay in seconds between searches> | restart"
  exitFailure
           
main :: IO ()
main = do
  args <- getArgs
  case args of
    [arg] | arg == "restart" -> restartSearch
          | otherwise -> usage
    
    (searchterm:delay:[]) -> do
            -- do not use readState since we want to exit even if
            -- the file is unreadable (perhaps due to a version change)
            flag <- doesFileExist "search.state"
            if flag then errorOut "Search term and interval given but search.state exists" 
              else startSearch searchterm delay

    _ -> usage

{-
We hard-code the timeout for twitter searches to be 10 seconds.
-}
runSearch :: SearchState -> IO ()
runSearch state@(SS o searchterm sleep _) = do
  waitForNextSearch state
  ctime <- getCurrentTime
  logMsg $ "Download started at: " ++ show ctime
  n <- downloadData 10 o
  -- should only really save if it was successful (incl. no data)  
  -- so could compare the url for download and, if the same,
  -- do not write out, but need to fix current issues with this first
  -- Actually, need to think whether this is worth it
  let nstate = toSearchState n searchterm sleep ctime 
  saveState nstate
  runSearch nstate