how to generate complicated/nested sql statement for postgres upsert/merge
I have several large(billions of rows) tables which need upsert operations. I have checked the #960 and #2551, and still did not get what I want. if you think this is duplicated, pls close it.
upsert idea from PostgreSQL official page
I just follow the idea presented on postgres page: http://www.postgresql.org/docs/9.4/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING
CREATE TABLE db (a INT PRIMARY KEY, b TEXT);
CREATE FUNCTION merge_db(key INT, data TEXT) RETURNS VOID AS
$$
BEGIN
LOOP
-- first try to update the key
UPDATE db SET b = data WHERE a = key;
IF found THEN
RETURN;
END IF;
-- not there, so try to insert the key
-- if someone else inserts the same key concurrently,
-- we could get a unique-key failure
BEGIN
INSERT INTO db(a,b) VALUES (key, data);
RETURN;
EXCEPTION WHEN unique_violation THEN
-- Do nothing, and loop to try the UPDATE again.
END;
END LOOP;
END;
$$
LANGUAGE plpgsql;
SELECT merge_db(1, 'david');
SELECT merge_db(1, 'dennis')
Our modified version
Since our insert and update statement are dynamic and complex type(jsonb, hstore, etc) involved, We prefer to build sql statements on sqlalchemy side. our pgsql script follows as:
-- Function: merge_record(text, text)
-- DROP FUNCTION merge_record(text, text);
CREATE OR REPLACE FUNCTION merge_record(update_expression text, insert_expression text)
RETURNS integer AS
$BODY$
declare
row_affected integer;
BEGIN
-- usually loop twice would be ok, here we set max value as 50 times.
-- it should be enough
FOR i IN 1..50 LOOP
-- first try to update the key
execute update_expression;
GET DIAGNOSTICS row_affected = ROW_COUNT;
IF row_affected > 0 THEN
RETURN 1;
END IF;
-- not there, so try to insert the key
-- if someone else inserts the same key concurrently,
-- we could get a unique-key failure
BEGIN
execute insert_expression;
RETURN 1;
EXCEPTION WHEN unique_violation THEN
-- Do nothing, and loop to try the UPDATE again.
END;
END LOOP;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
ALTER FUNCTION merge_record(text, text)
OWNER TO postgres;
Target script to be generated
SO what i want to build on client side is
select merge_record(update_expression, insert_expression);
e.g.
select merge_record(
'update target_table set s1="v1", jsonb2={"k1": "v1", "k1": "v2"}, hstore3={"k3"=>"v3", "k4"=>"v4"} where unique_id=45678766',
'insert into target_table (s1, jsonb2, hstore3, unique_id) VALUES("v1", {"k1": "v1", "k1": "v2"}, {"k3"=>"v3", "k4"=>"v4"}, 45678766)'
);
Problems we encountered
WE have used sqlalchemy SQL language expression to build the SQL statements dynamically. BUT the problem is that, values have many special characters, like $, \n etc (some of them are html text, so everything is possible). The exceptions are, that is invalid SQL statements generated, always happen several times per month.
Reasons for the problems
SO we think we must miss sth. pls give some suggestion if any. related code follows as:
raw_upsert_sql_statement = "select merge_record('%s', '%s')" % (update_expression, insert_expression)
conn.execute(text(raw_upsert_sql_statement))
insert_expression
dialect = postgresql.dialect()
target_table = self.__table__
stmt = target_table.insert().values(attr_value_hash)
sql_template = unicode(stmt.compile(dialect=dialect))
attr_value_hash = self._convert_safe_sql_str(attr_value_hash)
insert_expression = sql_template % (attr_value_hash)
update_expression
dialect = postgresql.dialect()
stmt = target_table.update().where(eval(self.condition_script)).values(attr_template_hash)
sql_template = unicode(stmt.compile(dialect=dialect))
attr_value_hash = self._convert_safe_sql_str(attr_value_hash)
update_expression = sql_template % (attr_value_hash)
### this should be the stupid parts
def _convert_safe_sql_str(self, params):
ret_params = {}
log.debug("Before converting: %s" % params)
# error:'dict' object does not support indexing
# fix: replace("%", "%%")
for k, v in params.iteritems():
try:
if type(v) in (datetime, str, unicode):
ret_params[k] = ('$$%s$$' % v).replace("'", "").replace("%", "%%")
elif type(v) in (MutableList, list):
ret_params[k] = ('$$%s$$' % v).replace("[", "{").replace("]", "}").replace("u'", "").replace("'", "").replace("%", "%%")
elif type(v) == MutableDict:
ret_params[k] = ('$$%s$$' % v).replace("{", "").replace("}", "").replace(":", "=>").replace("'", "").replace("%", "%%")
elif type(v) == dict:
ret_params[k] = ('$$%s$$' % json.dumps(v)).replace("\'", "").replace("%", "%%")
elif type(v) in (int, bool, float):
ret_params[k] = v
except Exception as ex:
log.error(ex.message)
log.error(traceback.format_exc())
raise ex
log.debug("After converting: %s" % ret_params)
return ret_params
thx a lot.
wenlong
Comments (4)
-
reporter -
reporter - marked as trivial
-
repo owner - changed status to closed
well the reason you're having problems with special characters is because you're feeding a raw SQL statement into a stored procedure and then using essentially the equivalent of "eval". I'd recommend not doing it that way. Have your stored procedure accept only data elements such as those json/hstore objects, but it should render all SQL itself.
-
repo owner for additional help please start at thread on the mailing list, thanks!
- Log in to comment