1
0
mirror of https://github.com/exaloop/codon.git synced 2025-06-03 15:03:52 +08:00

stdlib/internal/python.codon

This commit is contained in:
Ishak Numanagić 2022-01-24 08:13:03 +01:00
parent f8e29e2a3a
commit 1adc29a9f7

View File

@ -1,3 +1,5 @@
# (c) 2022 Exaloop Inc. All rights reserved.
import os import os
from internal.dlopen import * from internal.dlopen import *
@ -73,12 +75,14 @@ def __codon_repr__(fig):
""" """
_PY_INITIALIZED = False _PY_INITIALIZED = False
def init():
def init() -> void:
global _PY_INITIALIZED global _PY_INITIALIZED
if _PY_INITIALIZED: if _PY_INITIALIZED:
return return
LD = os.getenv('CODON_PYTHON', default='libpython.' + dlext()) LD = os.getenv("CODON_PYTHON", default="libpython." + dlext())
hnd = dlopen(LD, RTLD_LOCAL | RTLD_NOW) hnd = dlopen(LD, RTLD_LOCAL | RTLD_NOW)
global PyUnicode_AsEncodedString global PyUnicode_AsEncodedString
@ -154,43 +158,46 @@ def init():
global PyImport_AddModule global PyImport_AddModule
PyImport_AddModule = dlsym(hnd, "PyImport_AddModule") PyImport_AddModule = dlsym(hnd, "PyImport_AddModule")
Py_Initialize() Py_Initialize()
PyRun_SimpleString(_PY_INIT.c_str()) PyRun_SimpleString(_PY_INIT.c_str())
_PY_INITIALIZED = True _PY_INITIALIZED = True
def ensure_initialized():
def ensure_initialized() -> void:
if not _PY_INITIALIZED: if not _PY_INITIALIZED:
init() init()
# raise ValueError("Python not initialized; make sure to 'import python'") # raise ValueError("Python not initialized; make sure to 'import python'")
@extend @extend
class pyobj: class pyobj:
def __new__(p: Ptr[byte]) -> pyobj: def __new__(p: Ptr[byte]) -> pyobj:
return (p, ) return (p,)
def _getattr(self, name: str): def _getattr(self, name: str) -> pyobj:
return pyobj.exc_wrap(pyobj(PyObject_GetAttrString(self.p, name.c_str()))) return pyobj.exc_wrap(pyobj(PyObject_GetAttrString(self.p, name.c_str())))
def __setitem__(self, name: str, val: pyobj): def __setitem__(self, name: str, val: pyobj) -> pyobj:
return pyobj.exc_wrap(pyobj(PyObject_SetAttrString(self.p, name.c_str(), val.p))) return pyobj.exc_wrap(
pyobj(PyObject_SetAttrString(self.p, name.c_str(), val.p))
)
def __len__(self): def __len__(self) -> int:
return pyobj.exc_wrap(PyObject_Length(self.p)) return pyobj.exc_wrap(PyObject_Length(self.p))
def __to_py__(self): def __to_py__(self) -> pyobj:
return self return self
def __from_py__(self): def __from_py__(self) -> pyobj:
return self return self
def __str__(self): def __str__(self) -> str:
return str.__from_py__(self._getattr("__str__").__call__()) return str.__from_py__(self._getattr("__str__").__call__())
def __repr__(self): def __repr__(self) -> str:
return str.__from_py__(self._getattr("__repr__").__call__()) return str.__from_py__(self._getattr("__repr__").__call__())
def __iter__(self): def __iter__(self) -> Generator[pyobj]:
it = PyObject_GetIter(self.p) it = PyObject_GetIter(self.p)
if not it: if not it:
raise ValueError("Python object is not iterable") raise ValueError("Python object is not iterable")
@ -208,13 +215,15 @@ class pyobj:
pyobj(obj).decref() pyobj(obj).decref()
return str.from_ptr(bts) return str.from_ptr(bts)
def exc_check(): def exc_check() -> void:
ptype, pvalue, ptraceback = cobj(), cobj(), cobj() ptype, pvalue, ptraceback = cobj(), cobj(), cobj()
PyErr_Fetch(__ptr__(ptype), __ptr__(pvalue), __ptr__(ptraceback)) PyErr_Fetch(__ptr__(ptype), __ptr__(pvalue), __ptr__(ptraceback))
if ptype != cobj(): if ptype != cobj():
py_msg = PyObject_Str(pvalue) if pvalue != cobj() else pvalue py_msg = PyObject_Str(pvalue) if pvalue != cobj() else pvalue
msg = pyobj(py_msg).to_str("ignore", "<empty Python message>") msg = pyobj(py_msg).to_str("ignore", "<empty Python message>")
typ = pyobj.to_str(pyobj(PyObject_GetAttrString(ptype, "__name__".c_str())), "ignore") typ = pyobj.to_str(
pyobj(PyObject_GetAttrString(ptype, "__name__".c_str())), "ignore"
)
pyobj(ptype).decref() pyobj(ptype).decref()
pyobj(pvalue).decref() pyobj(pvalue).decref()
@ -223,14 +232,14 @@ class pyobj:
raise PyError(msg, typ) raise PyError(msg, typ)
def exc_wrap(_retval): def exc_wrap(_retval: T, T: type) -> T:
pyobj.exc_check() pyobj.exc_check()
return _retval return _retval
def incref(self): def incref(self) -> void:
Py_IncRef(self.p) Py_IncRef(self.p)
def decref(self): def decref(self) -> void:
Py_DecRef(self.p) Py_DecRef(self.p)
def __call__(self, *args, **kwargs) -> pyobj: def __call__(self, *args, **kwargs) -> pyobj:
@ -238,14 +247,16 @@ class pyobj:
kws = Dict[str, pyobj]() kws = Dict[str, pyobj]()
if staticlen(kwargs) > 0: if staticlen(kwargs) > 0:
kws = {next(names): i.__to_py__() for i in kwargs} kws = {next(names): i.__to_py__() for i in kwargs}
return pyobj.exc_wrap(pyobj(PyObject_Call(self.p, args.__to_py__().p, kws.__to_py__().p))) return pyobj.exc_wrap(
pyobj(PyObject_Call(self.p, args.__to_py__().p, kws.__to_py__().p))
)
def _tuple_new(length: int) -> pyobj: def _tuple_new(length: int) -> pyobj:
t = PyTuple_New(length) t = PyTuple_New(length)
pyobj.exc_check() pyobj.exc_check()
return pyobj(t) return pyobj(t)
def _tuple_set(self, idx: int, val: pyobj): def _tuple_set(self, idx: int, val: pyobj) -> void:
PyTuple_SetItem(self.p, idx, val.p) PyTuple_SetItem(self.p, idx, val.p)
pyobj.exc_check() pyobj.exc_check()
@ -254,7 +265,7 @@ class pyobj:
pyobj.exc_check() pyobj.exc_check()
return pyobj(t) return pyobj(t)
def _import(name: str): def _import(name: str) -> pyobj:
ensure_initialized() ensure_initialized()
if name in _PY_MODULE_CACHE: if name in _PY_MODULE_CACHE:
return _PY_MODULE_CACHE[name] return _PY_MODULE_CACHE[name]
@ -262,76 +273,88 @@ class pyobj:
_PY_MODULE_CACHE[name] = m _PY_MODULE_CACHE[name] = m
return m return m
def _exec(code: str): def _exec(code: str) -> void:
ensure_initialized() ensure_initialized()
PyRun_SimpleString(code.c_str()) PyRun_SimpleString(code.c_str())
def get(self, T: type) -> T: def get(self, T: type) -> T:
return T.__from_py__(self) return T.__from_py__(self)
def _main_module(): def _main_module() -> pyobj:
m = PyImport_AddModule("__main__".c_str()) m = PyImport_AddModule("__main__".c_str())
return pyobj(m) return pyobj(m)
def _repr_mimebundle_(self, bundle = Set[str]()): def _repr_mimebundle_(self, bundle=Set[str]()) -> Dict[str, str]:
fn = pyobj._main_module()._getattr("__codon_repr__") fn = pyobj._main_module()._getattr("__codon_repr__")
assert fn.p != cobj(), "cannot find python.__codon_repr__" assert fn.p != cobj(), "cannot find python.__codon_repr__"
mime, txt = fn.__call__(self).get(Tuple[str, str]) mime, txt = fn.__call__(self).get(Tuple[str, str])
return {mime: txt} return {mime: txt}
def none(): def none():
raise NotImplementedError() raise NotImplementedError()
# Type conversions # Type conversions
def py(x) -> pyobj: def py(x) -> pyobj:
return x.__to_py__() return x.__to_py__()
def get(x: pyobj, T: type) -> T: def get(x: pyobj, T: type) -> T:
return T.__from_py__(x) return T.__from_py__(x)
@extend @extend
class int: class int:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
return pyobj.exc_wrap(pyobj(PyLong_FromLong(self))) return pyobj.exc_wrap(pyobj(PyLong_FromLong(self)))
def __from_py__(i: pyobj): def __from_py__(i: pyobj) -> int:
return pyobj.exc_wrap(PyLong_AsLong(i.p)) return pyobj.exc_wrap(PyLong_AsLong(i.p))
@extend @extend
class float: class float:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
return pyobj.exc_wrap(pyobj(PyFloat_FromDouble(self))) return pyobj.exc_wrap(pyobj(PyFloat_FromDouble(self)))
def __from_py__(d: pyobj): def __from_py__(d: pyobj) -> float:
return pyobj.exc_wrap(PyFloat_AsDouble(d.p)) return pyobj.exc_wrap(PyFloat_AsDouble(d.p))
@extend @extend
class bool: class bool:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
return pyobj.exc_wrap(pyobj(PyBool_FromLong(int(self)))) return pyobj.exc_wrap(pyobj(PyBool_FromLong(int(self))))
def __from_py__(b: pyobj): def __from_py__(b: pyobj) -> bool:
return pyobj.exc_wrap(PyObject_IsTrue(b.p)) != 0 return pyobj.exc_wrap(PyObject_IsTrue(b.p)) != 0
@extend @extend
class byte: class byte:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
return str.__to_py__(str(__ptr__(self), 1)) return str.__to_py__(str(__ptr__(self), 1))
def __from_py__(c: pyobj): def __from_py__(c: pyobj) -> byte:
return str.__from_py__(c).p[0] return str.__from_py__(c).p[0]
@extend @extend
class str: class str:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
return pyobj.exc_wrap(pyobj(PyUnicode_DecodeFSDefaultAndSize(self.ptr, self.len))) return pyobj.exc_wrap(
pyobj(PyUnicode_DecodeFSDefaultAndSize(self.ptr, self.len))
)
def __from_py__(s: pyobj): def __from_py__(s: pyobj) -> str:
r = s.to_str("strict") r = s.to_str("strict")
pyobj.exc_check() pyobj.exc_check()
return r return r
@extend @extend
class List: class List:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
@ -344,7 +367,7 @@ class List:
idx += 1 idx += 1
return pyobj(pylist) return pyobj(pylist)
def __from_py__(v: pyobj): def __from_py__(v: pyobj) -> List[T]:
n = v.__len__() n = v.__len__()
t = List[T](n) t = List[T](n)
for i in range(n): for i in range(n):
@ -353,18 +376,19 @@ class List:
t.append(get(elem, T)) t.append(get(elem, T))
return t return t
@extend @extend
class Dict: class Dict:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
pydict = PyDict_New() pydict = PyDict_New()
pyobj.exc_check() pyobj.exc_check()
for k,v in self.items(): for k, v in self.items():
PyDict_SetItem(pydict, py(k).p, py(v).p) PyDict_SetItem(pydict, py(k).p, py(v).p)
pyobj.exc_check() pyobj.exc_check()
return pyobj(pydict) return pyobj(pydict)
def __from_py__(d: pyobj): def __from_py__(d: pyobj) -> Dict[K, V]:
b = Dict[K,V]() b = Dict[K, V]()
pos = 0 pos = 0
k_ptr = cobj() k_ptr = cobj()
v_ptr = cobj() v_ptr = cobj()
@ -375,6 +399,7 @@ class Dict:
b[k] = v b[k] = v
return b return b
@extend @extend
class Set: class Set:
def __to_py__(self) -> pyobj: def __to_py__(self) -> pyobj:
@ -384,4 +409,3 @@ class Set:
PySet_Add(pyset, py(a).p) PySet_Add(pyset, py(a).p)
pyobj.exc_check() pyobj.exc_check()
return pyobj(pyset) return pyobj(pyset)