Snippets

Mário Almeida Playdrone metadata to postgres

Created by Mário Almeida last modified
#!/usr/bin/env ruby
# encoding: UTF-8

require 'rubygems'
require 'json'
require 'pg'

#NOTE: This script requires postgres, a database called crawler, and as an argument: 
#              wget http://archive.org/download/playdrone-snapshots/2014-10-31.json
#
# Post-mortem comments:
# The json is too big to be parsed at once, so in this script we read it by line
# and break it in subsets of apps. To do so we roughly estimate them by the number of brackets.
# The subset validity is confirmed with a rather hackish regex that can likely be skipped.
# The subset is fed into a json parser and then dumped in a DB with each field as a column.
# Not the prettiest piece of code but works...
# Also note that while it is defined to use threads, we all know ruby threads... :)

app_list = ARGV[0]

if app_list.nil?
        puts "---->Missing argument : playdrone app list"
        exit
end

class Time
  def to_ms
    (self.to_f * 1000.0).to_i
  end
end

#--------------------------------#
# Modify these variables         #
#--------------------------------#
#number of applications to attribute to thread at a time
max_apps = 1000
#number of threads to process apps
max_threads = 2
#estimate of apps to be processed (1.1M)
number_of_estimated_apps = 1100000
#--------------------------------#

#--------------------------------#
#    Dont modify these           #
#--------------------------------#
#match apps from json, removing extra commas ( hack to check json subset validity )
r = /({([\s]*".*$)+[\s]*}([\s]*,[\s]*{([\s]*".*$)+[\s]*}){#{max_apps -1}})/
#bracket within quotes matching
rr = /".*[{}]+.*"/
#keeps track of open brackets
count_open = 0
#previous count
count_previous = 0
#keeps track of number of apps to parse
num_apps = 0
#keeps track of launched threads
threads = []
#keeps current apps collected
entries = ""
#keeps track of number of read lines
line_number = 0
#data copy mutex for each thread
copydata_mutex = Mutex.new
#thread data
thread_data = []
#current thread id
thread_id = 0
#parsed at least one {
parsed_one_bracket = false
#number of already processed apps
number_of_processed_apps = 0
#connections array
connections = []
#--------------------------------#

puts "---->Executing this script will delete the playdrone table. Are you sure you want this? (Y/N)"
res = STDIN.gets.chomp

if res != "Y" && res != "y"
        puts "---->Canceling"
        exit
end

start_time = Time.now

for i in 0..max_threads
   connections[i] = PG.connect( dbname: 'crawler' )
end

# Output a table to the DB
conn = PG.connect( dbname: 'crawler' )

conn.exec("DROP TABLE IF EXISTS playdrone;")

conn.exec("CREATE TABLE playdrone(\
        app_id TEXT NOT NULL,\
        title TEXT NOT NULL,\
        developer_name TEXT NOT NULL,\
        category TEXT NOT NULL,\
        free BOOL NOT NULL,\
        version_code INT NOT NULL,\
        version_string TEXT NOT NULL,\
        installation_size INT8 NOT NULL,\
        downloads INT NOT NULL,\
        star_rating REAL NOT NULL,\
        snapshot_date TEXT NOT NULL,\
        metadata_url TEXT NOT NULL,\
        apk_url TEXT NOT NULL,\
        PRIMARY KEY (app_id, category)
        );");

File.open(app_list, "r") do |f|
        f.each_line do |line|

                line_number += 1
                #puts line

                #Count number of apps found
                if line.include? "}" and line !~ rr
                        count_open = count_open -1
                elsif line.include? "{" and line !~ rr
                        count_open = count_open +1
                        parsed_one_bracket = true
                end

                #skip first line
                next unless line_number > 1

                #aggregate lines
                entries = "#{entries}#{line}"

                if count_open == 0 and parsed_one_bracket == true
                        num_apps += 1
                        parsed_one_bracket = false

                        #puts "---->Found one app - actual #{num_apps}"

                        #once we have max_apps apps, parse them
                        if num_apps == max_apps

                                #puts entries

                                threads << Thread.new(entries, threads.size){ |app_data, id|
                                        #app_data = entries

                                        #puts "---->Connecting to db"
                                        #connection cannot be shared accross threads!!
                                        new_conn = connections[id] #PG.connect( dbname: 'crawler' )
                                        #puts "---->Connected!"

                                        if not json = r.match(app_data)
                                                puts "---->Couldnt match data! Last line number : #{line_number}"
                                                puts r
                                                puts app_data
                                                exit
                                        end

                                        app_data = "[#{json[1]}]"
                                        #puts "---->\n\napp_data"
                                        #puts "---->Json parsing"
                                        begin
                                                data = JSON.parse(app_data)
                                        rescue JSON::ParserError
                                                puts "---->JSON::ParserError - Printing current data: "
                                                puts app_data
                                                exit
                                        end
                                        #puts "---->Done Json parsing"

                                        data.each { |app|
                                                id = app["app_id"]
                                                title = app["title"].gsub("'", %q('')) #escape apostrophes
                                                dev_name = app["developer_name"].gsub("'", %q('')) #escape apostrophes
                                                category = app["category"]
                                                free = app["free"]
                                                version_code = app["version_code"]
                                                version_string = app["version_string"].gsub("'", %q('')) #escape apostrophes
                                                installation_size = app["installation_size"]
                                                downloads = app["downloads"]
                                                star_rating = app["star_rating"]
                                                snapshot_date = app["snapshot_date"]
                                                metadata_url = app["metadata_url"]
                                                apk_url = app["apk_url"]

                                                #puts "---->Processing #{id} : #{title}"

                                                cmd = "INSERT INTO playdrone (category, app_id, title, developer_name, free, version_code, version_string, installation_size, downloads,star_rating, snapshot_date, metadata_url, apk_url)\
                                                                VALUES ('#{category}','#{id}','#{title}', '#{dev_name}', '#{free}', '#{version_code}', '#{version_string}', '#{installation_size}', '#{downloads}', '#{star_rating}', '#{snapshot_date}', '#{metadata_url}', '#{apk_url}');"

                                                #puts cmd

                                                #puts "---->Running PG.exec"
                                                begin
                                                        new_conn.exec(cmd)
                                                rescue PG::UniqueViolation
                                                        puts "---->PG::UniqueViolation - Failed to insert #{app}"
                                                        next
                                                rescue PG::SyntaxError
                                                        puts "---->PG::SyntaxError - Printing current data:"
                                                        puts app.inspect
                                                        exit
                                                rescue PG::NumericValueOutOfRange
                                                        puts "---->PG::NumericValueOutOfRange - Printing current data:"
                                                        puts app.inspect
                                                        exit
                                                end
                                                #puts "---->Done PG.exec"

                                                #puts "---->Insert done!"
                                        }

                                        #puts "---->Closing connection!"
                                        #new_conn.close
                                        #puts "---->Connection closed!"
                                        #puts "---->Thread done!"
                                }

                                number_of_processed_apps += max_apps

                                num_apps = 0
                                entries = ""
                        end
                end

                if line.include?("{") and line.include?("}") and line !~ rr
                        count_open = count_open +1
                        parsed_one_bracket = true
                        entries = "#{entries}{\n"
                end

                if count_open <0 or count_open > max_apps
                        puts "---->Incorrect number of open brackets -> #{count_open}"
                        exit
                end

                #puts "---->Current number of threads : #{threads.size}"

                if threads.size > 0 and (threads.size % max_threads == 0)
                        puts "---->Thread join on threads"
                        threads.each { |thr| thr.join }
                        end_time = Time.now
                        threads = []
                        puts "---->Processed #{number_of_processed_apps} apps, time for #{max_threads*max_apps} apps = #{end_time.to_ms - start_time.to_ms}"
                        start_time = Time.now
                #else
                #       puts "---->No join, number threads = #{threads.size}, #{count_open}"
                end
        end
end

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.