1
0
mirror of https://github.com/exaloop/codon.git synced 2025-06-03 15:03:52 +08:00
codon/stdlib/internal/file.codon
A. R. Shajii 5613c1a84b
v0.16 (#335)
* Add Python extension lowering pass

* Add DocstringAttribute

* Add extension module codegen

* Handle different argument counts efficiently

* Add warnings to extension lowering

* Fix module name

* Fix extension codegen

* Fix argument check

* Auto-convert Codon exceptions to Python exceptions

* Fix #183

* Fix #162; Fix #135

* Fix #155

* Fix CPython interface in codegen

* Fix #191

* Fix #187

* Fix #189

* Generate object file in pyext mode

* Convert Codon exceptions to Python exceptions

* Fix vtable init; Fix failing tests on Linux

* Fix #190

* Fix #156

* Fix union routing

* Remove need for import python

* Automatic @export and wrapping for toplevel functions

* Reorganize API

* Add Python extension IR structs

* Add special calls for no-suspend yield-expr

* Add special calls for no-suspend yield-expr

* pyextension.h support [wip]

* pyextension.h support [wip]

* pyextension.h support

* pyextension.h support for toplevel functions

* clang-format

* Add PyFunction::nargs field

* Update pyextension codegen (WIP)

* SUpport nargs

* Add support for @pycapture

* PyType codegen (WIP)

* Py method codegen (WIP)

* Add type ptr hook

* Add getset codegen

* Add type alloc function

* Add type pointer hook codegen

* Re-organize codegen

* Add member codegen

* Update module init codegen

* Update module init codegen

* Add support for typePtrHook and new to/from_py hooks

* Fix extension codegen

* Fix init codegen

* Fix init codegen; add "tp_new" slot

* Fix type hook

* Add extra flags

* Specialized wrappers (PyType specs)

* Add static Python link option

* Fix C imports

* Add guards

* Remove unused field

* Python mode only when pyExt set

* Update python module

* Fix assert

* Update codegen/passes

* Fix tuple parsing in index expression

* Fix empty tuple unification

* Do not Cythonize underscore fns

* clang-format

* Fix switch

* Add Py support for cmp/setitem

* Add Py support for cmp/setitem

* Add type is support

* GetSet support

* clang-format

* GetSet support (fixes)

* Avoid useless vtable alloc

* Add iter support

* Fix size_t capture bug

* clang-format

* Fix POD type unification with tuples

* Add __try_from_py__ API

* Fix annotation

* Add static reflection methods (setattr; internal.static.*); refactor PyExt to python.codon; handle errors and kwargs in PyExt

* Python compat fixes

* Update Python object conversions

* Fix PyErrors

* clang-format; add copyright

* Add PyFunction::keywords field

* Fix JIT MRO handling; Refactor out Jupyter support

* Refactor out Jupyter support

* Add support for custom linking args (link=[]) to TOML plugins

* Fix tests

* Use g++ instead of gcc

* Fix Jupyter CMAKE

* Fix Jupyter CMAKE

* Add _PyArg_Parser definition

* Add complex64 type

* Add extra complex64 tests

* Fix Python calls; add staticenumerate

* Fix call

* Fix calls

* Update pyext wrappers

* Fix staticenumerate; Support static calls in tuple()

* Fix pyext routing

* Add add/mul for tuples

* clang-format

* Fix pyext codegen

* Fix wrap_multiple

* Add seq_alloc_atomic_uncollectable

* Fix default generics issue

* Add binary/ternary ops

* Fix missing generic issue

* Fix number slots

* Update pow

* Remove unnecessary pyobj

* Fix allocation

* Refactor errors

* Add test extension

* Fix formatting

* clang-format

* Fix getitem/setitem/delitem in pyext

* Fix pyext iterators

* Add builtin pow() (fix #294)

* Fix #244

* Fix #231

* Fix #229

* Fix #205

* Update docs

* Fix error message

* Add pyext tests

* Add pyext support for @property

* Add pyext support for toplevel fns and @tuple classes

* More pyext tests

* More pyext tests

* Fix file error checking

* More pyext tests

* Update pyext tests

* Update docs

* Add pyext test to CI

* Add pyext support for @tuple.__new__

* Add pyext support for @tuple.__new__

* Fix hetero-tuple issue with fn_overloads

* More pyext tests

* Bump versions

* Fix del magic in pyext

* Fix init magic for tuples in pyext

* Have test-pypi only run on develop branch

* Make exception type indices unnamed-addr

* Fix #316; Fix #317 (slash issue)

* Use uncollectible-alloc for vtable

* Fix #249

* Add pyext docs

* Fix #249; Fix clashing vtables; Fix super() and class_copy

* Add content-atomic type property instruction

* __contents_atomic__ support

* Update internal functions

* Use PIC when generating Python extension

* Cleanup

* Add Dockerfile & fix -fPIC

* Cleanup

* Fix setup.py

* Fix pyext fn iteration

* Fix CI

* clang-format

* Update long conversions in Py bridge

* Support wide-int to str conversions

* Fix test

* Add pow for arbitrary-width ints

* Fix Linux backtraces

* Cleanup

* Add more tests

* Fix docs; Remove tuple.__add__ for scalars

* Update docs

---------

Co-authored-by: Ibrahim Numanagić <ibrahimpasa@gmail.com>
2023-04-12 18:13:54 -04:00

262 lines
6.9 KiB
Python

# Copyright (C) 2022-2023 Exaloop Inc. <https://exaloop.io>
from internal.gc import realloc, free
class File:
sz: int
buf: Ptr[byte]
fp: cobj
def __init__(self, fp: cobj):
self.fp = fp
self._reset()
def __init__(self, path: str, mode: str):
self.fp = _C.fopen(path.c_str(), mode.c_str())
if not self.fp:
raise IOError(f"file {path} could not be opened")
self._reset()
def _errcheck(self, msg: str):
err = int(_C.ferror(self.fp))
if err:
raise IOError(f"file I/O error: {msg}")
def __enter__(self):
pass
def __exit__(self):
self.close()
def __iter__(self) -> Generator[str]:
for a in self._iter():
yield a.__ptrcopy__()
def readlines(self) -> List[str]:
return [l for l in self]
def write(self, s: str):
self._ensure_open()
_C.fwrite(s.ptr, 1, len(s), self.fp)
self._errcheck("error in write")
def __file_write_gen__(self, g: Generator[T], T: type):
for s in g:
self.write(str(s))
def read(self, sz: int = -1) -> str:
self._ensure_open()
if sz < 0:
SEEK_SET = 0
SEEK_END = 2
cur = _C.ftell(self.fp)
_C.fseek(self.fp, 0, i32(SEEK_END))
sz = _C.ftell(self.fp) - cur
_C.fseek(self.fp, cur, i32(SEEK_SET))
buf = Ptr[byte](sz)
ret = _C.fread(buf, 1, sz, self.fp)
self._errcheck("error in read")
return str(buf, ret)
def tell(self) -> int:
self._ensure_open()
ret = _C.ftell(self.fp)
self._errcheck("error in tell")
return ret
def seek(self, offset: int, whence: int):
self._ensure_open()
_C.fseek(self.fp, offset, i32(whence))
self._errcheck("error in seek")
def flush(self):
self._ensure_open()
_C.fflush(self.fp)
def close(self):
if self.fp:
_C.fclose(self.fp)
self.fp = cobj()
if self.buf:
_C.free(self.buf)
self._reset()
def _ensure_open(self):
if not self.fp:
raise IOError("I/O operation on closed file")
def _reset(self):
self.buf = Ptr[byte]()
self.sz = 0
def _iter(self) -> Generator[str]:
self._ensure_open()
while True:
rd = _C.getline(
Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp
)
if rd != -1:
yield str(self.buf, rd)
else:
break
def _iter_trim_newline(self) -> Generator[str]:
self._ensure_open()
while True:
rd = _C.getline(
Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp
)
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
def _gz_errcheck(stream: cobj):
errnum = i32(0)
msg = _C.gzerror(stream, __ptr__(errnum))
if msg and msg[0]:
raise IOError(f"zlib error: {str(msg, _C.strlen(msg))}")
class gzFile:
sz: int
buf: Ptr[byte]
fp: cobj
def __init__(self, fp: cobj):
self.fp = fp
self._reset()
def __init__(self, path: str, mode: str):
self.fp = _C.gzopen(path.c_str(), mode.c_str())
if not self.fp:
raise IOError(f"file {path} could not be opened")
self._reset()
def _getline(self) -> int:
if not self.buf:
self.sz = 128
self.buf = Ptr[byte](self.sz)
offset = 0
while True:
if not _C.gzgets(self.fp, self.buf + offset, i32(self.sz - offset)):
_gz_errcheck(self.fp)
if offset == 0:
return -1
break
offset += _C.strlen(self.buf + offset)
if self.buf[offset - 1] == byte(10): # '\n'
break
oldsz = self.sz
self.sz *= 2
self.buf = realloc(self.buf, self.sz, oldsz)
return offset
def __iter__(self) -> Generator[str]:
for a in self._iter():
yield a.__ptrcopy__()
def __enter__(self):
pass
def __exit__(self):
self.close()
def close(self):
if self.fp:
_C.gzclose(self.fp)
self.fp = cobj()
if self.buf:
free(self.buf)
self._reset()
def readlines(self) -> List[str]:
return [l for l in self]
def write(self, s: str):
self._ensure_open()
_C.gzwrite(self.fp, s.ptr, u32(len(s)))
_gz_errcheck(self.fp)
def __file_write_gen__(self, g: Generator[T], T: type):
for s in g:
self.write(str(s))
def read(self, sz: int = -1) -> str:
self._ensure_open()
if sz < 0:
buf = _strbuf()
for a in self._iter():
buf.append(a)
return buf.__str__()
buf = Ptr[byte](sz)
ret = _C.gzread(self.fp, buf, u32(sz))
_gz_errcheck(self.fp)
return str(buf, int(ret))
def tell(self) -> int:
self._ensure_open()
ret = _C.gztell(self.fp)
_gz_errcheck(self.fp)
return ret
def seek(self, offset: int, whence: int):
self._ensure_open()
_C.gzseek(self.fp, offset, i32(whence))
_gz_errcheck(self.fp)
def flush(self):
Z_FINISH = 4
self._ensure_open()
_C.gzflush(self.fp, i32(Z_FINISH))
_gz_errcheck(self.fp)
def _iter(self) -> Generator[str]:
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
yield str(self.buf, rd)
else:
break
def _iter_trim_newline(self) -> Generator[str]:
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
def _ensure_open(self):
if not self.fp:
raise IOError("I/O operation on closed file")
def _reset(self):
self.buf = cobj()
self.sz = 0
def open(path: str, mode: str = "r") -> File:
return File(path, mode)
def gzopen(path: str, mode: str = "r") -> gzFile:
return gzFile(path, mode)
def is_binary(path: str) -> bool:
# https://stackoverflow.com/questions/898669/how-can-i-detect-if-a-file-is-binary-non-text-in-python/7392391#7392391
# Can get both false positive and false negatives, but still is a
# clever approach that works for the large majority of files
textchars = {7, 8, 9, 10, 12, 13, 27} | set(iter(range(0x20, 0x100))) - {0x7F}
with open(path, "rb") as f:
header = f.read(1024)
return any(ord(c) not in textchars for c in header)