markhneedham / altnet-sydney-fsharp
Code from my F# presentation at the Alt.NET Sydney user group
Clone this repository (size: 7.3 KB): HTTPS / SSH
$ hg clone http://bitbucket.org/markhneedham/altnet-sydney-fsharp/
| commit 0: | dfcbc2acf486 |
| branch: | default |
| tags: | tip |
code from altnet-sydney presentation
9 months ago
Changed (Δ9.2 KB):
raw changeset »
AfterObjects.fs (52 lines added, 0 lines removed)
BeforeObjects.fs (41 lines added, 0 lines removed)
MessageProcessing.fs (49 lines added, 0 lines removed)
Twitter.fs (65 lines added, 0 lines removed)
Up to file-list AfterObjects.fs:
1 |
#light |
|
2 |
||
3 |
open Dimebrain.TweetSharp.Fluent |
|
4 |
open Dimebrain.TweetSharp.Extensions |
|
5 |
open Dimebrain.TweetSharp.Model |
|
6 |
open Microsoft.FSharp.Core.Operators |
|
7 |
open MarkFSharp |
|
8 |
// #r "C:\Playbox\FSharpPlayground\Twitter\lib\Dimebrain.TweetSharp.dll" |
|
9 |
||
10 |
let getStatusesBefore (statusId:int64) = |
|
11 |
let friendsTimeLine = FluentTwitter.CreateRequest().AuthenticateAs(Config.user, Config.password).Statuses().OnFriendsTimeline() |
|
12 |
if(statusId = 0L) then |
|
13 |
friendsTimeLine.AsJson().Request().AsStatuses() |
|
14 |
else |
|
15 |
friendsTimeLine.Before(statusId).AsJson().Request().AsStatuses() |
|
16 |
||
17 |
type TwitterBackwardsSearch(startingTweetId:int64, tweetsSoFar:int, tweetsToTraverse:int) = |
|
18 |
member x.ShouldKeepSearching() = tweetsSoFar < tweetsToTraverse |
|
19 |
member x.LastId = startingTweetId |
|
20 |
member x.NextSearch(newStartingTweetId: int64) = |
|
21 |
new TwitterBackwardsSearch( startingTweetId = newStartingTweetId, tweetsSoFar = tweetsSoFar+20, tweetsToTraverse = tweetsToTraverse) |
|
22 |
||
23 |
let withLinks (statuses:seq<Dimebrain.TweetSharp.Model.TwitterStatus>) = |
|
24 |
statuses |> Seq.filter (fun eachStatus -> eachStatus.Text.Contains("http")) |
|
25 |
||
26 |
let print (statuses:seq<Dimebrain.TweetSharp.Model.TwitterStatus>) = |
|
27 |
for status in statuses do |
|
28 |
printfn "[%s] %s" status.User.ScreenName status.Text |
|
29 |
||
30 |
type Tweets = { TwitterStatuses: seq<TwitterStatus> } |
|
31 |
||
32 |
type Tweets with |
|
33 |
member x.print() = print x.TwitterStatuses |
|
34 |
member x.withLinks() = { TwitterStatuses = withLinks x.TwitterStatuses} |
|
35 |
||
36 |
let rec findStatuses(twitterBackwardsSearch:TwitterBackwardsSearch) (soFar: Tweets) = |
|
37 |
if(twitterBackwardsSearch.ShouldKeepSearching()) then |
|
38 |
let findOldestStatus = Seq.sortBy (fun (eachStatus:TwitterStatus) -> eachStatus.Id) >> Seq.hd |
|
39 |
let latestStatuses = getStatusesBefore twitterBackwardsSearch.LastId |
|
40 |
findStatuses (twitterBackwardsSearch.NextSearch(findOldestStatus(latestStatuses).Id)) |
|
41 |
{ TwitterStatuses = (seq { yield! latestStatuses |> withLinks; yield! soFar.TwitterStatuses }) } |
|
42 |
else |
|
43 |
soFar |
|
44 |
||
45 |
type TwitterService() = |
|
46 |
static member GetLatestTwitterStatuses(recordsToSearch) = |
|
47 |
findStatuses (new TwitterBackwardsSearch(startingTweetId = 0L,tweetsSoFar = 0 , tweetsToTraverse = recordsToSearch)) |
|
48 |
{ TwitterStatuses = [] } |
|
49 |
||
50 |
// To get the tweets run this |
|
51 |
// let m = TwitterService.GetLatestTwitterStatuses 100 |
|
52 |
// m.print() |
Up to file-list BeforeObjects.fs:
1 |
#light |
|
2 |
||
3 |
open Dimebrain.TweetSharp.Fluent |
|
4 |
open Dimebrain.TweetSharp.Extensions |
|
5 |
open Dimebrain.TweetSharp.Model |
|
6 |
open Microsoft.FSharp.Core.Operators |
|
7 |
open MarkFSharp |
|
8 |
||
9 |
let getStatusesBefore (statusId:int64) = FluentTwitter.CreateRequest().AuthenticateAs(Config.user, Config.password).Statuses() |
|
10 |
.OnFriendsTimeline().Before(statusId).AsJson().Request().AsStatuses() |
|
11 |
||
12 |
let getLatestStatuses = FluentTwitter.CreateRequest().AuthenticateAs(Config.user, Config.password).Statuses().OnFriendsTimeline() |
|
13 |
.AsJson().Request().AsStatuses() |
|
14 |
||
15 |
let withLinks (statuses:seq<Dimebrain.TweetSharp.Model.TwitterStatus>) = |
|
16 |
statuses |> Seq.filter (fun eachStatus -> eachStatus.Text.Contains("http")) |
|
17 |
||
18 |
let print (statuses:seq<Dimebrain.TweetSharp.Model.TwitterStatus>) = |
|
19 |
for status in statuses do |
|
20 |
printfn "[%s] %s" status.User.ScreenName status.Text |
|
21 |
||
22 |
let findOldestStatus (statuses:seq<Dimebrain.TweetSharp.Model.TwitterStatus>) = |
|
23 |
statuses |> Seq.sortBy (fun eachStatus -> eachStatus.Id) |> Seq.hd |
|
24 |
||
25 |
let oldestStatusId = (getLatestStatuses |> findOldestStatus).Id |
|
26 |
||
27 |
let rec findLinks (args:int64 * int * int) = |
|
28 |
match args with |
|
29 |
| (_, numberProcessed, recordsToSearch) when numberProcessed >= recordsToSearch -> ignore |
|
30 |
| (0L, numberProcessed, recordsToSearch) -> |
|
31 |
let latestStatuses = getLatestStatuses |
|
32 |
(latestStatuses |> withLinks) |> print |
|
33 |
findLinks(findOldestStatus(latestStatuses).Id, numberProcessed + 20, recordsToSearch) |
|
34 |
| (lastId, numberProcessed, recordsToSearch) -> |
|
35 |
let latestStatuses = getStatusesBefore lastId |
|
36 |
(latestStatuses |> withLinks) |> print |
|
37 |
findLinks(findOldestStatus(latestStatuses).Id, numberProcessed + 20, recordsToSearch) |
|
38 |
||
39 |
// to get the tweets run this |
|
40 |
let findStatusesWithLinks recordsToSearch = |
|
41 |
findLinks(0L, 0, recordsToSearch) |> ignore |
Up to file-list MessageProcessing.fs:
1 |
namespace MarkFSharp |
|
2 |
module MessageProcessing |
|
3 |
open Dimebrain.TweetSharp.Model |
|
4 |
open System.Threading |
|
5 |
open MarkFSharp.TwitterHelper |
|
6 |
||
7 |
type Message = Phrase of TwitterStatus | Stop |
|
8 |
||
9 |
type ILinkProcessor = |
|
10 |
abstract Send : TwitterStatus -> Unit |
|
11 |
abstract Stop : Unit -> Unit |
|
12 |
||
13 |
type LinkProcessor(callBack) = |
|
14 |
let agent = MailboxProcessor.Start(fun inbox -> |
|
15 |
let rec loop () = |
|
16 |
async { |
|
17 |
let! msg = inbox.Receive() |
|
18 |
match msg with |
|
19 |
| Phrase item -> |
|
20 |
callBack item |
|
21 |
return! loop() |
|
22 |
| Stop -> |
|
23 |
return () |
|
24 |
} |
|
25 |
loop() |
|
26 |
) |
|
27 |
interface ILinkProcessor with |
|
28 |
member x.Send(status:TwitterStatus) = agent.Post(Phrase(status)) |
|
29 |
member x.Stop() = agent.Post(Stop) |
|
30 |
||
31 |
type MainProcessor(linkProcessor:ILinkProcessor) = |
|
32 |
let agent = MailboxProcessor.Start(fun inbox -> |
|
33 |
let rec loop () = |
|
34 |
async { |
|
35 |
let! msg = inbox.Receive() |
|
36 |
match msg with |
|
37 |
| Phrase item when item |> hasLink -> |
|
38 |
linkProcessor.Send(item) |
|
39 |
return! loop() |
|
40 |
| Phrase item -> |
|
41 |
return! loop() |
|
42 |
| Stop -> |
|
43 |
return () |
|
44 |
} |
|
45 |
loop() |
|
46 |
) |
|
47 |
||
48 |
member x.Send = Seq.iter (fun (status:TwitterStatus) -> agent.Post(Phrase(status))) |
|
49 |
member x.Stop() = agent.Post(Stop) |
1 |
#light |
|
2 |
||
3 |
||
4 |
||
5 |
namespace MarkFSharp |
|
6 |
module Twitter = |
|
7 |
open Dimebrain.TweetSharp.Fluent |
|
8 |
open Dimebrain.TweetSharp.Extensions |
|
9 |
open Dimebrain.TweetSharp.Model |
|
10 |
open Microsoft.FSharp.Core.Operators |
|
11 |
open System |
|
12 |
open System.Threading |
|
13 |
open System.Text.RegularExpressions |
|
14 |
open MarkFSharp.TwitterHelper |
|
15 |
open MarkFSharp.MessageProcessing |
|
16 |
open LitJson |
|
17 |
open SharpCouch |
|
18 |
||
19 |
let getStatusesBefore (statusId:int64) = |
|
20 |
let friendsTimeLine = FluentTwitter.CreateRequest().AuthenticateAs(Config.user, Config.password).Statuses().OnFriendsTimeline() |
|
21 |
if(statusId = 0L) then |
|
22 |
friendsTimeLine.AsJson().Request().AsStatuses() |
|
23 |
else |
|
24 |
friendsTimeLine.Before(statusId).AsJson().Request().AsStatuses() |
|
25 |
||
26 |
type TwitterGateway = |
|
27 |
interface ITalkToTwitter with |
|
28 |
member x.GetTweets () = seq { yield new TwitterStatus()} |
|
29 |
and ITalkToTwitter = |
|
30 |
abstract GetTweets : Unit -> seq<TwitterStatus> |
|
31 |
||
32 |
||
33 |
let couchDb = new CouchDB("http://172.16.246.2:5984/", "sharpcouch") |
|
34 |
||
35 |
let linkProcessor = new LinkProcessor(fun status -> couchDb.CreateDocument(JsonMapper.ToJson(status)) |> ignore ) |
|
36 |
//let linkProcessor = new LinkProcessor(fun status -> printfn "Id:%s @ %s [%s]" (status.Id.ToString()) (DateTime.Now.ToLongTimeString()) (Thread.CurrentThread.ManagedThreadId.ToString()) ) |
|
37 |
let centralProcessor = new MainProcessor(linkProcessor) |
|
38 |
||
39 |
type TwitterBackwardsSearch(startingTweetId:int64, tweetsSoFar:int, tweetsToTraverse:int) = |
|
40 |
member x.ShouldKeepSearching() = tweetsSoFar < tweetsToTraverse |
|
41 |
member x.LastId = startingTweetId |
|
42 |
member x.NextSearch(newStartingTweetId: int64) = |
|
43 |
new TwitterBackwardsSearch( startingTweetId = newStartingTweetId, tweetsSoFar = tweetsSoFar+20, tweetsToTraverse = tweetsToTraverse) |
|
44 |
||
45 |
||
46 |
let rec findStatuses(twitterBackwardsSearch:TwitterBackwardsSearch) = |
|
47 |
if(twitterBackwardsSearch.ShouldKeepSearching()) then |
|
48 |
let findOldestStatus = Seq.sortBy (fun (eachStatus:TwitterStatus) -> eachStatus.Id) >> Seq.hd |
|
49 |
let latestStatuses = getStatusesBefore twitterBackwardsSearch.LastId |
|
50 |
printfn "about to send batch to mailbox processor at %s [%s]" (DateTime.Now.ToLongTimeString()) (Thread.CurrentThread.ManagedThreadId.ToString()) |
|
51 |
centralProcessor.Send(latestStatuses) |
|
52 |
findStatuses <| twitterBackwardsSearch.NextSearch(findOldestStatus(latestStatuses).Id) |
|
53 |
else |
|
54 |
centralProcessor.Stop() |
|
55 |
||
56 |
type TwitterService() = |
|
57 |
static member GetLatestTwitterStatuses(recordsToSearch) = |
|
58 |
findStatuses <| new TwitterBackwardsSearch(startingTweetId = 0L,tweetsSoFar = 0 , tweetsToTraverse = recordsToSearch) |
|
59 |
||
60 |
[<EntryPoint>] |
|
61 |
let main args = |
|
62 |
let numberOfTweets = System.Int32.Parse(args.[0]) |
|
63 |
TwitterService.GetLatestTwitterStatuses numberOfTweets |
|
64 |
0 |
|
65 |
