mirror of https://github.com/exaloop/codon.git
447 lines
13 KiB
Python
447 lines
13 KiB
Python
# (c) 2022 Exaloop Inc. All rights reserved.
|
|
|
|
import os
|
|
|
|
from internal.dlopen import *
|
|
|
|
Py_DecRef = Function[[cobj], void](cobj())
|
|
Py_IncRef = Function[[cobj], void](cobj())
|
|
Py_Initialize = Function[[], void](cobj())
|
|
Py_None = cobj()
|
|
PyBool_FromLong = Function[[int], cobj](cobj())
|
|
PyBytes_AsString = Function[[cobj], cobj](cobj())
|
|
PyDict_New = Function[[], cobj](cobj())
|
|
PyDict_Next = Function[[cobj, Ptr[int], Ptr[cobj], Ptr[cobj]], int](cobj())
|
|
PyDict_SetItem = Function[[cobj, cobj, cobj], cobj](cobj())
|
|
PyErr_Fetch = Function[[Ptr[cobj], Ptr[cobj], Ptr[cobj]], void](cobj())
|
|
PyFloat_AsDouble = Function[[cobj], float](cobj())
|
|
PyFloat_FromDouble = Function[[float], cobj](cobj())
|
|
PyImport_AddModule = Function[[cobj], cobj](cobj())
|
|
PyImport_ImportModule = Function[[cobj], cobj](cobj())
|
|
PyIter_Next = Function[[cobj], cobj](cobj())
|
|
PyList_GetItem = Function[[cobj, int], cobj](cobj())
|
|
PyList_New = Function[[int], cobj](cobj())
|
|
PyList_SetItem = Function[[cobj, int, cobj], cobj](cobj())
|
|
PyLong_AsLong = Function[[cobj], int](cobj())
|
|
PyLong_FromLong = Function[[int], cobj](cobj())
|
|
PyObject_Call = Function[[cobj, cobj, cobj], cobj](cobj())
|
|
PyObject_GetAttr = Function[[cobj, cobj], cobj](cobj())
|
|
PyObject_GetAttrString = Function[[cobj, cobj], cobj](cobj())
|
|
PyObject_GetIter = Function[[cobj], cobj](cobj())
|
|
PyObject_HasAttrString = Function[[cobj, cobj], int](cobj())
|
|
PyObject_IsTrue = Function[[cobj], int](cobj())
|
|
PyObject_Length = Function[[cobj], int](cobj())
|
|
PyObject_SetAttrString = Function[[cobj, cobj, cobj], cobj](cobj())
|
|
PyObject_Str = Function[[cobj], cobj](cobj())
|
|
PyRun_SimpleString = Function[[cobj], void](cobj())
|
|
PySet_Add = Function[[cobj, cobj], cobj](cobj())
|
|
PySet_New = Function[[cobj], cobj](cobj())
|
|
PyTuple_GetItem = Function[[cobj, int], cobj](cobj())
|
|
PyTuple_New = Function[[int], cobj](cobj())
|
|
PyTuple_SetItem = Function[[cobj, int, cobj], void](cobj())
|
|
PyUnicode_AsEncodedString = Function[[cobj, cobj, cobj], cobj](cobj())
|
|
PyUnicode_DecodeFSDefaultAndSize = Function[[cobj, int], cobj](cobj())
|
|
PyUnicode_FromString = Function[[cobj], cobj](cobj())
|
|
|
|
_PY_MODULE_CACHE = Dict[str, pyobj]()
|
|
|
|
_PY_INIT = """
|
|
import io
|
|
|
|
clsf = None
|
|
clsa = None
|
|
plt = None
|
|
try:
|
|
import matplotlib.figure
|
|
import matplotlib.pyplot
|
|
plt = matplotlib.pyplot
|
|
clsf = matplotlib.figure.Figure
|
|
clsa = matplotlib.artist.Artist
|
|
except ModuleNotFoundError:
|
|
pass
|
|
|
|
def __codon_repr__(fig):
|
|
if clsf and isinstance(fig, clsf):
|
|
stream = io.StringIO()
|
|
fig.savefig(stream, format="svg")
|
|
return 'image/svg+xml', stream.getvalue()
|
|
elif clsa and isinstance(fig, list) and all(
|
|
isinstance(i, clsa) for i in fig
|
|
):
|
|
stream = io.StringIO()
|
|
plt.gcf().savefig(stream, format="svg")
|
|
return 'image/svg+xml', stream.getvalue()
|
|
elif hasattr(fig, "_repr_html_"):
|
|
return 'text/html', fig._repr_html_()
|
|
else:
|
|
return 'text/plain', fig.__repr__()
|
|
"""
|
|
|
|
_PY_INITIALIZED = False
|
|
|
|
|
|
def init_dl_handles(py_handle: cobj):
|
|
global Py_DecRef
|
|
global Py_IncRef
|
|
global Py_Initialize
|
|
global Py_None
|
|
global PyBool_FromLong
|
|
global PyBytes_AsString
|
|
global PyDict_New
|
|
global PyDict_Next
|
|
global PyDict_SetItem
|
|
global PyErr_Fetch
|
|
global PyFloat_AsDouble
|
|
global PyFloat_FromDouble
|
|
global PyImport_AddModule
|
|
global PyImport_ImportModule
|
|
global PyIter_Next
|
|
global PyList_GetItem
|
|
global PyList_New
|
|
global PyList_SetItem
|
|
global PyLong_AsLong
|
|
global PyLong_FromLong
|
|
global PyObject_Call
|
|
global PyObject_GetAttr
|
|
global PyObject_GetAttrString
|
|
global PyObject_GetIter
|
|
global PyObject_HasAttrString
|
|
global PyObject_IsTrue
|
|
global PyObject_Length
|
|
global PyObject_SetAttrString
|
|
global PyObject_Str
|
|
global PyRun_SimpleString
|
|
global PySet_Add
|
|
global PySet_New
|
|
global PyTuple_GetItem
|
|
global PyTuple_New
|
|
global PyTuple_SetItem
|
|
global PyUnicode_AsEncodedString
|
|
global PyUnicode_DecodeFSDefaultAndSize
|
|
global PyUnicode_FromString
|
|
Py_DecRef = dlsym(py_handle, "Py_DecRef")
|
|
Py_IncRef = dlsym(py_handle, "Py_IncRef")
|
|
Py_Initialize = dlsym(py_handle, "Py_Initialize")
|
|
Py_None = dlsym(py_handle, "_Py_NoneStruct")
|
|
PyBool_FromLong = dlsym(py_handle, "PyBool_FromLong")
|
|
PyBytes_AsString = dlsym(py_handle, "PyBytes_AsString")
|
|
PyDict_New = dlsym(py_handle, "PyDict_New")
|
|
PyDict_Next = dlsym(py_handle, "PyDict_Next")
|
|
PyDict_SetItem = dlsym(py_handle, "PyDict_SetItem")
|
|
PyErr_Fetch = dlsym(py_handle, "PyErr_Fetch")
|
|
PyFloat_AsDouble = dlsym(py_handle, "PyFloat_AsDouble")
|
|
PyFloat_FromDouble = dlsym(py_handle, "PyFloat_FromDouble")
|
|
PyImport_AddModule = dlsym(py_handle, "PyImport_AddModule")
|
|
PyImport_ImportModule = dlsym(py_handle, "PyImport_ImportModule")
|
|
PyIter_Next = dlsym(py_handle, "PyIter_Next")
|
|
PyList_GetItem = dlsym(py_handle, "PyList_GetItem")
|
|
PyList_New = dlsym(py_handle, "PyList_New")
|
|
PyList_SetItem = dlsym(py_handle, "PyList_SetItem")
|
|
PyLong_AsLong = dlsym(py_handle, "PyLong_AsLong")
|
|
PyLong_FromLong = dlsym(py_handle, "PyLong_FromLong")
|
|
PyObject_Call = dlsym(py_handle, "PyObject_Call")
|
|
PyObject_GetAttr = dlsym(py_handle, "PyObject_GetAttr")
|
|
PyObject_GetAttrString = dlsym(py_handle, "PyObject_GetAttrString")
|
|
PyObject_GetIter = dlsym(py_handle, "PyObject_GetIter")
|
|
PyObject_HasAttrString = dlsym(py_handle, "PyObject_HasAttrString")
|
|
PyObject_IsTrue = dlsym(py_handle, "PyObject_IsTrue")
|
|
PyObject_Length = dlsym(py_handle, "PyObject_Length")
|
|
PyObject_SetAttrString = dlsym(py_handle, "PyObject_SetAttrString")
|
|
PyObject_Str = dlsym(py_handle, "PyObject_Str")
|
|
PyRun_SimpleString = dlsym(py_handle, "PyRun_SimpleString")
|
|
PySet_Add = dlsym(py_handle, "PySet_Add")
|
|
PySet_New = dlsym(py_handle, "PySet_New")
|
|
PyTuple_GetItem = dlsym(py_handle, "PyTuple_GetItem")
|
|
PyTuple_New = dlsym(py_handle, "PyTuple_New")
|
|
PyTuple_SetItem = dlsym(py_handle, "PyTuple_SetItem")
|
|
PyUnicode_AsEncodedString = dlsym(py_handle, "PyUnicode_AsEncodedString")
|
|
PyUnicode_DecodeFSDefaultAndSize = dlsym(py_handle, "PyUnicode_DecodeFSDefaultAndSize")
|
|
PyUnicode_FromString = dlsym(py_handle, "PyUnicode_FromString")
|
|
|
|
|
|
def init(python_loaded: bool = False):
|
|
global _PY_INITIALIZED
|
|
if _PY_INITIALIZED:
|
|
return
|
|
|
|
py_handle: cobj
|
|
if python_loaded:
|
|
py_handle = dlopen("", RTLD_LOCAL | RTLD_NOW)
|
|
else:
|
|
LD = os.getenv("CODON_PYTHON", default="libpython." + dlext())
|
|
py_handle = dlopen(LD, RTLD_LOCAL | RTLD_NOW)
|
|
|
|
init_dl_handles(py_handle)
|
|
|
|
if not python_loaded:
|
|
Py_Initialize()
|
|
|
|
PyRun_SimpleString(_PY_INIT.c_str())
|
|
_PY_INITIALIZED = True
|
|
|
|
|
|
def ensure_initialized(python_loaded: bool = False):
|
|
if not _PY_INITIALIZED:
|
|
init(python_loaded)
|
|
|
|
|
|
@extend
|
|
class pyobj:
|
|
@__internal__
|
|
def __new__() -> pyobj:
|
|
pass
|
|
|
|
def __raw__(self) -> Ptr[byte]:
|
|
return __internal__.class_raw(self)
|
|
|
|
def __init__(self, p: Ptr[byte]):
|
|
self.p = p
|
|
|
|
def __del__(self):
|
|
self.decref()
|
|
|
|
def _getattr(self, name: str) -> pyobj:
|
|
return pyobj.exc_wrap(pyobj(PyObject_GetAttrString(self.p, name.c_str())))
|
|
|
|
def __setitem__(self, name: str, val: pyobj) -> pyobj:
|
|
return pyobj.exc_wrap(
|
|
pyobj(PyObject_SetAttrString(self.p, name.c_str(), val.p))
|
|
)
|
|
|
|
def __len__(self) -> int:
|
|
return pyobj.exc_wrap(PyObject_Length(self.p))
|
|
|
|
def __to_py__(self) -> pyobj:
|
|
return self
|
|
|
|
def __from_py__(self) -> pyobj:
|
|
return self
|
|
|
|
def __str__(self) -> str:
|
|
return str.__from_py__(self._getattr("__str__").__call__())
|
|
|
|
def __repr__(self) -> str:
|
|
return str.__from_py__(self._getattr("__repr__").__call__())
|
|
|
|
def __iter__(self) -> Generator[pyobj]:
|
|
it = PyObject_GetIter(self.p)
|
|
if not it:
|
|
raise ValueError("Python object is not iterable")
|
|
while i := PyIter_Next(it):
|
|
yield pyobj(pyobj.exc_wrap(i))
|
|
pyobj.decref(it)
|
|
pyobj.exc_check()
|
|
|
|
def to_str(self, errors: str, empty: str = "") -> str:
|
|
obj = PyUnicode_AsEncodedString(self.p, "utf-8".c_str(), errors.c_str())
|
|
if obj == cobj():
|
|
return empty
|
|
bts = PyBytes_AsString(obj)
|
|
pyobj.decref(obj)
|
|
return str.from_ptr(bts)
|
|
|
|
def exc_check():
|
|
ptype, pvalue, ptraceback = cobj(), cobj(), cobj()
|
|
PyErr_Fetch(__ptr__(ptype), __ptr__(pvalue), __ptr__(ptraceback))
|
|
if ptype != cobj():
|
|
py_msg = PyObject_Str(pvalue) if pvalue != cobj() else pvalue
|
|
msg = pyobj(py_msg).to_str("ignore", "<empty Python message>")
|
|
typ = pyobj(PyObject_GetAttrString(ptype, "__name__".c_str())).to_str("ignore")
|
|
|
|
pyobj.decref(ptype)
|
|
pyobj.decref(pvalue)
|
|
pyobj.decref(ptraceback)
|
|
pyobj.decref(py_msg)
|
|
|
|
raise PyError(msg, typ)
|
|
|
|
def exc_wrap(_retval: T, T: type) -> T:
|
|
pyobj.exc_check()
|
|
return _retval
|
|
|
|
def incref(self):
|
|
Py_IncRef(self.p)
|
|
|
|
def incref(obj: pyobj):
|
|
Py_IncRef(obj.p)
|
|
|
|
def incref(ptr: Ptr[byte]):
|
|
Py_IncRef(ptr)
|
|
|
|
def decref(self):
|
|
Py_DecRef(self.p)
|
|
|
|
def decref(obj: pyobj):
|
|
Py_DecRef(obj.p)
|
|
|
|
def decref(ptr: Ptr[byte]):
|
|
Py_DecRef(ptr)
|
|
|
|
def __call__(self, *args, **kwargs) -> pyobj:
|
|
names = iter(kwargs.__dict__())
|
|
kws = dict[str, pyobj]()
|
|
if staticlen(kwargs) > 0:
|
|
kws = {next(names): i.__to_py__() for i in kwargs}
|
|
return pyobj.exc_wrap(
|
|
pyobj(PyObject_Call(self.p, args.__to_py__().p, kws.__to_py__().p))
|
|
)
|
|
|
|
def _tuple_new(length: int) -> pyobj:
|
|
t = PyTuple_New(length)
|
|
pyobj.exc_check()
|
|
return pyobj(t)
|
|
|
|
def _tuple_set(self, idx: int, val: pyobj):
|
|
PyTuple_SetItem(self.p, idx, val.p)
|
|
pyobj.exc_check()
|
|
|
|
def _tuple_get(self, idx: int) -> pyobj:
|
|
t = PyTuple_GetItem(self.p, idx)
|
|
pyobj.exc_check()
|
|
return pyobj(t)
|
|
|
|
def _import(name: str) -> pyobj:
|
|
ensure_initialized()
|
|
if name in _PY_MODULE_CACHE:
|
|
return _PY_MODULE_CACHE[name]
|
|
m = pyobj.exc_wrap(pyobj(PyImport_ImportModule(name.c_str())))
|
|
_PY_MODULE_CACHE[name] = m
|
|
return m
|
|
|
|
def _exec(code: str):
|
|
ensure_initialized()
|
|
PyRun_SimpleString(code.c_str())
|
|
|
|
def get(self, T: type) -> T:
|
|
return T.__from_py__(self)
|
|
|
|
def _main_module() -> pyobj:
|
|
m = PyImport_AddModule("__main__".c_str())
|
|
return pyobj(m)
|
|
|
|
def _repr_mimebundle_(self, bundle=Set[str]()) -> Dict[str, str]:
|
|
fn = pyobj._main_module()._getattr("__codon_repr__")
|
|
assert fn.p != cobj(), "cannot find python.__codon_repr__"
|
|
mime, txt = fn.__call__(self).get(Tuple[str, str])
|
|
return {mime: txt}
|
|
|
|
|
|
def none():
|
|
raise NotImplementedError()
|
|
|
|
|
|
# Type conversions
|
|
|
|
|
|
def py(x) -> pyobj:
|
|
return x.__to_py__()
|
|
|
|
|
|
def get(x: pyobj, T: type) -> T:
|
|
return T.__from_py__(x)
|
|
|
|
|
|
@extend
|
|
class int:
|
|
def __to_py__(self) -> pyobj:
|
|
return pyobj.exc_wrap(pyobj(PyLong_FromLong(self)))
|
|
|
|
def __from_py__(i: pyobj) -> int:
|
|
return pyobj.exc_wrap(PyLong_AsLong(i.p))
|
|
|
|
|
|
@extend
|
|
class float:
|
|
def __to_py__(self) -> pyobj:
|
|
return pyobj.exc_wrap(pyobj(PyFloat_FromDouble(self)))
|
|
|
|
def __from_py__(d: pyobj) -> float:
|
|
return pyobj.exc_wrap(PyFloat_AsDouble(d.p))
|
|
|
|
|
|
@extend
|
|
class bool:
|
|
def __to_py__(self) -> pyobj:
|
|
return pyobj.exc_wrap(pyobj(PyBool_FromLong(int(self))))
|
|
|
|
def __from_py__(b: pyobj) -> bool:
|
|
return pyobj.exc_wrap(PyObject_IsTrue(b.p)) != 0
|
|
|
|
|
|
@extend
|
|
class byte:
|
|
def __to_py__(self) -> pyobj:
|
|
return str.__to_py__(str(__ptr__(self), 1))
|
|
|
|
def __from_py__(c: pyobj) -> byte:
|
|
return str.__from_py__(c).p[0]
|
|
|
|
|
|
@extend
|
|
class str:
|
|
def __to_py__(self) -> pyobj:
|
|
return pyobj.exc_wrap(
|
|
pyobj(PyUnicode_DecodeFSDefaultAndSize(self.ptr, self.len))
|
|
)
|
|
|
|
def __from_py__(s: pyobj) -> str:
|
|
r = s.to_str("strict")
|
|
pyobj.exc_check()
|
|
return r
|
|
|
|
|
|
@extend
|
|
class List:
|
|
def __to_py__(self) -> pyobj:
|
|
pylist = PyList_New(len(self))
|
|
pyobj.exc_check()
|
|
idx = 0
|
|
for a in self:
|
|
PyList_SetItem(pylist, idx, py(a).p)
|
|
pyobj.exc_check()
|
|
idx += 1
|
|
return pyobj(pylist)
|
|
|
|
def __from_py__(v: pyobj) -> List[T]:
|
|
n = v.__len__()
|
|
t = List[T](n)
|
|
for i in range(n):
|
|
elem = pyobj(PyList_GetItem(v.p, i))
|
|
pyobj.exc_check()
|
|
t.append(get(elem, T))
|
|
return t
|
|
|
|
|
|
@extend
|
|
class Dict:
|
|
def __to_py__(self) -> pyobj:
|
|
pydict = PyDict_New()
|
|
pyobj.exc_check()
|
|
for k, v in self.items():
|
|
PyDict_SetItem(pydict, py(k).p, py(v).p)
|
|
pyobj.exc_check()
|
|
return pyobj(pydict)
|
|
|
|
def __from_py__(d: pyobj) -> Dict[K, V]:
|
|
b = dict[K, V]()
|
|
pos = 0
|
|
k_ptr = cobj()
|
|
v_ptr = cobj()
|
|
while PyDict_Next(d.p, __ptr__(pos), __ptr__(k_ptr), __ptr__(v_ptr)):
|
|
pyobj.exc_check()
|
|
k = get(pyobj(k_ptr), K)
|
|
v = get(pyobj(v_ptr), V)
|
|
b[k] = v
|
|
return b
|
|
|
|
|
|
@extend
|
|
class Set:
|
|
def __to_py__(self) -> pyobj:
|
|
pyset = PySet_New(cobj())
|
|
pyobj.exc_check()
|
|
for a in self:
|
|
PySet_Add(pyset, py(a).p)
|
|
pyobj.exc_check()
|
|
return pyobj(pyset)
|