how to generate complicated/nested sql statement for postgres upsert/merge

Issue #3384 closed
wenlong_lu created an issue

I have several large(billions of rows) tables which need upsert operations. I have checked the #960 and #2551, and still did not get what I want. if you think this is duplicated, pls close it.

upsert idea from PostgreSQL official page

I just follow the idea presented on postgres page: http://www.postgresql.org/docs/9.4/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING

CREATE TABLE db (a INT PRIMARY KEY, b TEXT);

CREATE FUNCTION merge_db(key INT, data TEXT) RETURNS VOID AS
$$
BEGIN
    LOOP
        -- first try to update the key
        UPDATE db SET b = data WHERE a = key;
        IF found THEN
            RETURN;
        END IF;
        -- not there, so try to insert the key
        -- if someone else inserts the same key concurrently,
        -- we could get a unique-key failure
        BEGIN
            INSERT INTO db(a,b) VALUES (key, data);
            RETURN;
        EXCEPTION WHEN unique_violation THEN
            -- Do nothing, and loop to try the UPDATE again.
        END;
    END LOOP;
END;
$$
LANGUAGE plpgsql;

SELECT merge_db(1, 'david');
SELECT merge_db(1, 'dennis')

Our modified version

Since our insert and update statement are dynamic and complex type(jsonb, hstore, etc) involved, We prefer to build sql statements on sqlalchemy side. our pgsql script follows as:

-- Function: merge_record(text, text)
-- DROP FUNCTION merge_record(text, text);

CREATE OR REPLACE FUNCTION merge_record(update_expression text, insert_expression text)
  RETURNS integer AS
$BODY$
declare
  row_affected integer;
BEGIN
    -- usually loop twice would be ok, here we set max value as 50 times.
    -- it should be enough
    FOR i IN 1..50 LOOP
        -- first try to update the key
        execute update_expression;
        GET DIAGNOSTICS row_affected = ROW_COUNT;
        IF row_affected > 0 THEN
            RETURN 1;
        END IF;
        -- not there, so try to insert the key
        -- if someone else inserts the same key concurrently,
        -- we could get a unique-key failure
        BEGIN
            execute insert_expression;
            RETURN 1;
        EXCEPTION WHEN unique_violation THEN
            -- Do nothing, and loop to try the UPDATE again.
        END;
   END LOOP;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;
ALTER FUNCTION merge_record(text, text)
  OWNER TO postgres;

Target script to be generated

SO what i want to build on client side is

select merge_record(update_expression, insert_expression);
e.g.
select merge_record(
     'update target_table set s1="v1", jsonb2={"k1": "v1", "k1": "v2"}, hstore3={"k3"=>"v3", "k4"=>"v4"} where unique_id=45678766', 
     'insert into target_table (s1, jsonb2, hstore3, unique_id) VALUES("v1", {"k1": "v1", "k1": "v2"}, {"k3"=>"v3", "k4"=>"v4"}, 45678766)'
);

Problems we encountered

WE have used sqlalchemy SQL language expression to build the SQL statements dynamically. BUT the problem is that, values have many special characters, like $, \n etc (some of them are html text, so everything is possible). The exceptions are, that is invalid SQL statements generated, always happen several times per month.

Reasons for the problems

SO we think we must miss sth. pls give some suggestion if any. related code follows as:

raw_upsert_sql_statement = "select merge_record('%s', '%s')" % (update_expression, insert_expression)
conn.execute(text(raw_upsert_sql_statement))

insert_expression
        dialect = postgresql.dialect()
        target_table = self.__table__
        stmt = target_table.insert().values(attr_value_hash)
        sql_template = unicode(stmt.compile(dialect=dialect))
        attr_value_hash = self._convert_safe_sql_str(attr_value_hash)
        insert_expression = sql_template % (attr_value_hash)

update_expression
        dialect = postgresql.dialect()
        stmt = target_table.update().where(eval(self.condition_script)).values(attr_template_hash)
        sql_template = unicode(stmt.compile(dialect=dialect))
        attr_value_hash = self._convert_safe_sql_str(attr_value_hash)
        update_expression = sql_template % (attr_value_hash)

### this should be the stupid parts
def _convert_safe_sql_str(self, params):
        ret_params = {} 
        log.debug("Before converting: %s" % params)

        # error:'dict' object does not support indexing
        # fix:  replace("%", "%%")
        for k, v in params.iteritems():
            try:
                if type(v) in (datetime, str, unicode):
                    ret_params[k] = ('$$%s$$' % v).replace("'", "").replace("%", "%%")
                elif type(v) in (MutableList, list):
                    ret_params[k] = ('$$%s$$' % v).replace("[", "{").replace("]", "}").replace("u'", "").replace("'", "").replace("%", "%%")
                elif type(v) == MutableDict:
                    ret_params[k] = ('$$%s$$' % v).replace("{", "").replace("}", "").replace(":", "=>").replace("'", "").replace("%", "%%")
                elif type(v) == dict:
                    ret_params[k] = ('$$%s$$' % json.dumps(v)).replace("\'", "").replace("%", "%%")
                elif type(v) in (int, bool, float):
                    ret_params[k] = v
            except Exception as ex:
                log.error(ex.message)
                log.error(traceback.format_exc())
                raise ex

        log.debug("After converting: %s" % ret_params)
        return ret_params

thx a lot.

wenlong

Comments (4)

  1. Mike Bayer repo owner

    well the reason you're having problems with special characters is because you're feeding a raw SQL statement into a stored procedure and then using essentially the equivalent of "eval". I'd recommend not doing it that way. Have your stored procedure accept only data elements such as those json/hstore objects, but it should render all SQL itself.

  2. Log in to comment