Snippets

naikoto libctx: Lua coroutine yield from sqlite3_step progress handler

Created by naikoto last modified
local ffi = require 'ffi'
local ctx = require 'ctx'
local sqlite = require 'sqlite'

local x = tonumber(os.getenv 'X') or 1000

local db = sqlite.open ':memory:'
local stmt = db:query [[
 with recursive r(x) as (select 1 union all select x+1 from r limit @x)
 select count(x) from r group by r.x % 4
]]:ubind(x)

local stack = ctx.new(0x4000)
local yield = stack(sqlite.C.sqlite3_step, stmt)
--local buf = ffi.new('uint64_t[?]', 0x4000 / 8);
--yield = ctx.set(buf, ffi.sizeof(buf), sqlite.C.sqlite3_step, stmt)

db:progress_handler(x, ctx.yield, yield)

print('Start', yield)

local event_loop = coroutine.wrap(function()
  while true do
    if yield() then
      if yield.result == sqlite.C.SQLITE_ROW then
        io.write(' ', stmt:get(0), '\n')
        stack(sqlite.C.sqlite3_step, stmt)
        if buf then ctx.set(buf, ffi.sizeof(buf), sqlite.C.sqlite3_step, stmt) end
      else
        if yield.result == sqlite.C.SQLITE_DONE then
          print('VM steps', stmt:vm_steps())
          print('Memory', sqlite.memory())
        else
          print('Error', db:errmsg())
        end
        break
      end
    else io.write('.'); io.flush() end

    coroutine.yield(true)
  end
end)

repeat until not event_loop()

do
  local sz = buf and ffi.sizeof(buf) or #stack
  local mem = buf or stack:ptr()
  local zr = 0; while mem[zr] == 0 do zr = zr + 1 end
  print('Stack', sz - zr * 8, sz)
end
--- libctx yield support ---

local ffi = require 'ffi'
local C = ffi.load 'ctx'
local M = { C = C }

ffi.cdef [[
struct ctx_stack { void *stack; size_t size; };
struct ctx_yield_state { int result; int step; int64_t __opaque[148]; };
int ctx_new(struct ctx_stack*);
void ctx_free(struct ctx_stack);
struct ctx_yield_state* ctx_stack_yield_state(struct ctx_stack*, void*, void*);
int ctx_yield(struct ctx_yield_state*);
]]

M.yield = C.ctx_yield

ffi.metatype('struct ctx_stack', {
  __call = C.ctx_stack_yield_state;
  __len = function(a) return tonumber(a.size) end;
  __index = {
    free = function(a) C.ctx_free(ffi.gc(a, nil)) end;
    yield_state = C.ctx_stack_yield_state;
    ptr = function(a)
      return ffi.cast('uintptr_t*', ffi.cast('uintptr_t', a.stack) - a.size)
    end;
  }
})

ffi.metatype('struct ctx_yield_state', {
  __call = function(a)
    local rc = C.ctx_yield(a)
    if rc < 0 then error 'invalid state' end
    return rc ~= 0
  end;
})

function M.new(size)
  if not (size >= 1) then error 'invalid stack size' end
  local stack = ffi.new 'struct ctx_stack'
  stack.size = size
  if C.ctx_new(stack) == 0 then
    return ffi.gc(stack, C.ctx_free)
  end
end

function M.set(buf, size, target, param)
  if not (size >= 0x2000) then error 'invalid stack size' end
  local stack = ffi.new 'struct ctx_stack'
  stack.stack = ffi.cast('void*', ffi.cast('uintptr_t', buf) + size)
  stack.size = size
  return stack(target, param);
end;

return M
#include <string.h>
#include "ctx_yield.h"

ctx_yield_state*
ctx_stack_yield_state(ctx_stack* stack, void* target, void* param) {
  if (!stack || !target) { return NULL; }

  ctx_yield_state* ys = (ctx_yield_state*)(
    ((uintptr_t)stack->stack - sizeof(ctx_yield_state)) & -16
  );

  memset(ys, 0, sizeof(ctx_yield_state));

  ys->step++;
  ys->state[1].__opaque[0] = (uintptr_t)ys;
  ys->state[1].__opaque[1] = (uintptr_t)target;
  ys->state[1].__opaque[2] = (uintptr_t)param;

  return ys;
}

void
ctx_yield_call(ctx_yield_state* ys) {
  int(*target)(void*) = (int(*)(void*))ys->state[1].__opaque[1];
  void* param = (void*)ys->state[1].__opaque[2];
  ys->result = target(param);
  ys->step = 0;
  ctx_jump(&ys->state[0], &ys->extra[0], NULL);
}

int
ctx_yield(ctx_yield_state* ys) {
  if (ys) {
    if (ys->step & 1) {
      if (ctx_mark(&ys->state[0], &ys->extra[0], NULL)) {
        return !ys->step;
      } else if (ys->step == 1) {
        void* sp = (void*)ys->state[1].__opaque[0];

        if (sp && ys->state[1].__opaque[1]) {
          ys->step++;
          ctx_call(ys, ((ctx_stack){sp, 0}), ctx_yield_call);
        }
      } else {
        ys->step--;
        ctx_jump(&ys->state[1], &ys->extra[1], NULL);
      }
    } else if (ys->step) {
      if (ctx_mark(&ys->state[1], &ys->extra[1], NULL)) {
        return ys->result;
      } else {
        ys->step++;
        ctx_jump(&ys->state[0], &ys->extra[0], NULL);
      }
    }
  }

  return -1;
}
#pragma once

#include "ctx.h"

typedef struct ctx_yield_state ctx_yield_state;

struct ctx_yield_state {
  int result;
  unsigned int step;
  ctx_state state[2];
  ctx_extra extra[2];
};

ctx_yield_state*
ctx_stack_yield_state(ctx_stack* stack, void* target, void* param);

int
ctx_yield(ctx_yield_state* state);
// gcc -Os -o test_sqlite test_sqlite.c ctx.c ctx_yield.c gas-x86_64-sysv.S -lsqlite3

#include "ctx_yield.h"
#include "sqlite3.h"

#include <assert.h>
#include <stdio.h>

#define STEPS 1000
const char* QUERY = "with recursive r(x) as"
  " (select 1 union all select x+1 from r limit 1000)"
  " select count(x) from r group by r.x % 4";

int
main(int argc, const char **argv)
{
  ctx_stack stack = { NULL, 1 };
  assert(ctx_new(&stack) == 0);
  assert(ctx_stack_yield_state(NULL, NULL, NULL) == NULL);

  sqlite3* db;
  sqlite3_stmt* stmt;
  assert(sqlite3_open(":memory:", &db) == SQLITE_OK);
  assert(sqlite3_prepare_v2(db, QUERY, 200, &stmt, NULL) == SQLITE_OK);

  ctx_yield_state* ys = ctx_stack_yield_state(&stack, sqlite3_step, (void*)stmt);
  assert(ys->result == 0);
  sqlite3_progress_handler(db, STEPS, (int(*)(void*))ctx_yield, ys);

  puts("Start.");

  // simulate event loop
  for (;;) {
    if (ctx_yield(ys)) {
      if (ys->result == SQLITE_ROW) {
        printf(" %d\n", sqlite3_column_int(stmt, 0));
        // rearm state, always returns same address for one stack
        ctx_stack_yield_state(&stack, sqlite3_step, (void*)stmt);
      } else {
        if (ys->result == SQLITE_DONE) {
          printf("VM steps: %d\n", sqlite3_stmt_status(stmt, SQLITE_STMTSTATUS_VM_STEP, 0));
          printf("Used memory %lld / %lld\n", sqlite3_memory_used(), sqlite3_memory_highwater(0));
        } else {
          printf("Error %d: %s\n", ys->result, sqlite3_errmsg(db));
        }
        break;
      }
    } else {
      // still running inside sqlite3_step
      putchar('.');
      // to abort: Y->result = 1
    }
  }

  uintptr_t* zr = (uintptr_t*)((uintptr_t)stack.stack - stack.size);
  while (!*zr) ++zr;
  printf("Used stack %lu / %zu\n", ((uintptr_t)stack.stack - (uintptr_t)zr), stack.size);

  sqlite3_finalize(stmt);
  sqlite3_close(db);
  ctx_free(stack);

  return 0;
}

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.