Source

astrosearch / CalcHistogram.hs

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-

Usage:

  calchistogram <ingraph>
  calchistogram json <ingraph>

Aim:

Calculate histograms of the tweets, separating out tweets from
re-tweets, and using a simple binning and a KDE.

At present all tweets are included (i.e. no restriction on hash tag).

-}

module Main where

import qualified Data.ByteString.Lazy.Char8 as LB
import qualified Data.Vector.Unboxed as UV
import qualified Data.Vector as V
import qualified Statistics.Sample.KernelDensity as K
import qualified Statistics.Sample.Histogram as H

import Data.Aeson
import Data.Either (partitionEithers)
import Data.Time (UTCTime)
import Database.HaSparqlClient (Query, NamedGraph, BindingValue(..))

import System.Environment (getArgs, getProgName)
import System.Exit (exitFailure, exitSuccess)
import System.IO (stderr, hPutStrLn)

import SPARQL (FromBinding(..), makeQuery, queryStore, fromStores, getTimeRange)
import TimeUtils (readTimeString, timeToDbl)

readCreated :: String -> UTCTime
readCreated = readTimeString "%b %d %T %z %Y" . drop 4

{-
Extract the tweet information in temporal order.
-}
queryTweets :: [NamedGraph] -> Query
queryTweets ngs = 
  unwords
  [ "prefix sioct: <http://rdfs.org/sioc/types#>"
  , "prefix dcterms: <http://purl.org/dc/terms/>"
  , "prefix tw: <http://purl.org/net/djburke/demo/twitter#>"
  , "SELECT ?time ?retweet "
  , fromStores ngs
  , " WHERE { "
  , "  ?tweet a sioct:MicroblogPost ; dcterms:created ?time ."
  , "  OPTIONAL { ?tweet tw:isRetweet ?retweet }"
  , "}"
  , "ORDER BY ASC(?time)"
  ]

{-
Run the query against the given endpoint, collecting
up the results. The two return values are the
tweet times and the re-tweet times.
-}
query :: String -> IO ((UV.Vector Double, UV.Vector Double), (UTCTime, UTCTime))
query endpoint = do
  stores <- queryStore endpoint
  ts <- getTimeRange endpoint stores
  res <- makeQuery getVals endpoint (queryTweets stores)
  return (splitVals res, ts)

getVals :: [BindingValue] -> Maybe (Double, Bool)
getVals (timebv:flagbv:[]) = do
  t <- fromBinding timebv
  f <- if flagbv == Unbound 
       then return False
       else case fromBinding flagbv of
         Just True -> return True
         _ -> return False
  return (timeToDbl t, f)
getVals _ = Nothing                 

splitVals :: [(Double, Bool)] -> (UV.Vector Double, UV.Vector Double)
splitVals vs = let
  (rtimes, ttimes) = partitionEithers $ map (\(t,f) -> if f then Left t else Right t) vs
  in (UV.fromList ttimes, UV.fromList rtimes)

newtype KDE = K (Double, Double)
              deriving (Eq, Show)
                       
instance ToJSON KDE where                       
  toJSON (K (x,y)) = object ["x" .= x, "y" .= y]
  
toKDEs :: (UV.Vector Double, UV.Vector Double) -> V.Vector KDE
toKDEs = V.map K . UV.convert . uncurry UV.zip

timeFilter :: Double -> Double -> UV.Vector Double -> UV.Vector Double
timeFilter tlo thi = UV.filter (\t -> t >= tlo && t < thi)

kdeFilter :: Double -> Double -> V.Vector KDE -> V.Vector KDE
kdeFilter tlo thi = V.filter (\(K (t,_)) -> t >= tlo && t < thi)

kdeNorm :: Int -> V.Vector KDE -> V.Vector KDE
kdeNorm n = V.map (\(K (t,v)) -> K (t, fromIntegral n * v))

{-
Since calcHist and calcKDE are called twice the calculations
could be lifted out and shared, but ignore that for now.
-}

{-
Calculate the histogram for number of points,
fitlering the times between tlo and thi, with the
given spacing.
-}
calcHist :: 
  Double     -- minimum time
  -> Double  -- maximum time
  -> Double  -- bin size
  -> UV.Vector Double 
  -> V.Vector KDE
calcHist tlo thi dt ts = 
  let delta = thi - tlo
      npts0 = delta / dt
      hn = floor npts0 + 1
      
      thi' = tlo + fromIntegral hn * dt
      
      xs = UV.enumFromStepN (tlo + dt/2) dt hn
      hs = H.histogram_ hn tlo thi' $ timeFilter tlo thi' ts
      
  in V.map K $ UV.convert $ UV.zip xs hs

{-
Since K.kde_ takes the number of points and rounds it up to
the next power of 2 we need to go to some effort to get
the required spacing.
-}
calcKDE ::
  Double  -- minimum time
  -> Double  -- maximum time
  -> Double  -- bin size
  -> UV.Vector Double 
  -> V.Vector KDE
calcKDE tlo thi dt ts =
  let total = thi - tlo
      npts0 = total / dt
      
      gp :: Double -> Int
      gp = floor . logBase 2
      n0 = 2 ^ gp npts0
      
      npts = if fromIntegral n0 >= npts0 then n0 else 2*n0
      
      -- looks like we have to deal with a half-bin width since start/end
      -- points may refer to the middle of the bin
      -- hdelta = (fromIntegral npts * dt - total) / 2
      hdelta = (fromIntegral npts * dt - total - dt) / 2

      n = UV.length $ timeFilter tlo thi ts
      
      tlo' = tlo - hdelta
      thi' = thi + hdelta
      
  in (kdeNorm n . kdeFilter tlo thi . toKDEs . K.kde_ npts tlo' thi' . timeFilter tlo' thi') ts
  
-- AAS221 streaming data starts at
--    2013-01-02 14:20:56.941684 UTC
--
getStats :: UV.Vector Double -> UV.Vector Double -> (Double, V.Vector KDE, V.Vector KDE, V.Vector KDE, V.Vector KDE)
getStats tws rtws = 
  let tlo = timeToDbl $ readCreated "Wed Jan 02 00:00:00 -0600 2013"
      thi = timeToDbl $ readCreated "Wed Jan 23 00:00:00 -0600 2013"
  
      binsize = 120.0
  
  in ( binsize
     , calcKDE tlo thi binsize tws
     , calcKDE tlo thi binsize rtws
     , calcHist tlo thi binsize tws
     , calcHist tlo thi binsize rtws )

showKDE4 :: (KDE, KDE, KDE, KDE) -> String
showKDE4 (K (x1,y1), K (_,y2), K (_,y3), K (_,y4)) = 
  unwords $ map show [x1, y1, y2, y3, y4]
  
doitSimple :: (UV.Vector Double, UV.Vector Double) -> IO ()
doitSimple (ttimes, rtimes) = do
  let (binsize, tweets, retweets, htweets, hretweets) =
        getStats ttimes rtimes
        
  putStrLn $ "binsize: " ++ show binsize
  V.forM_ (V.zip4 tweets retweets htweets hretweets) $ putStrLn . showKDE4
  
doitJSON :: ((UV.Vector Double, UV.Vector Double), (UTCTime, UTCTime)) -> IO ()
doitJSON ((ttimes, rtimes), ts) = 
  let (binsize, tweets, retweets, htweets, hretweets) =
        getStats ttimes rtimes
        
  in LB.putStrLn $ encode $ object
     [ "binsize"    .= binsize
     , "tweets"     .= tweets
     , "retweets"   .= retweets
     , "htweets"    .= htweets
     , "hretweets"  .= hretweets
     , "firstTweet" .= fst ts
     , "lastTweet"  .= snd ts
     ]
    
usage :: IO ()
usage = do
  pName <- getProgName
  hPutStrLn stderr $ "Usage: " ++ pName ++ " [json] <endpoint>"
  exitFailure
      
main :: IO ()
main = do
  args <- getArgs
  case args of
    [endpoint] -> query endpoint >>= doitSimple . fst >> exitSuccess
      
    (j:endpoint:[]) | j == "json" -> query endpoint >>= doitJSON >> exitSuccess
      
    _ -> usage