1
0
mirror of https://github.com/exaloop/codon.git synced 2025-06-03 15:03:52 +08:00
codon/stdlib/internal/file.codon

237 lines
6.0 KiB
Python
Raw Normal View History

2022-01-24 08:00:24 +01:00
# (c) 2022 Exaloop Inc. All rights reserved.
2021-09-27 14:02:44 -04:00
from internal.gc import realloc, free
2022-01-24 08:00:24 +01:00
2021-09-27 14:02:44 -04:00
class File:
sz: int
buf: Ptr[byte]
fp: cobj
2022-02-16 16:51:16 +01:00
def __init__(self, fp: cobj):
2021-09-27 14:02:44 -04:00
self.fp = fp
self._reset()
2022-02-16 16:51:16 +01:00
def __init__(self, path: str, mode: str):
2021-09-27 14:02:44 -04:00
self.fp = _C.fopen(path.c_str(), mode.c_str())
if not self.fp:
2022-02-16 16:51:16 +01:00
raise IOError(f"file {path} could not be opened")
2021-09-27 14:02:44 -04:00
self._reset()
2022-02-16 16:51:16 +01:00
def _errcheck(self, msg: str):
2021-09-27 14:02:44 -04:00
err = int(_C.ferror(self.fp))
if err:
2022-02-16 16:51:16 +01:00
raise IOError(f"file I/O error: {msg}")
2021-09-27 14:02:44 -04:00
2022-02-16 16:51:16 +01:00
def __enter__(self):
2021-09-27 14:02:44 -04:00
pass
2022-02-16 16:51:16 +01:00
def __exit__(self):
2021-09-27 14:02:44 -04:00
self.close()
2022-01-24 08:00:24 +01:00
def __iter__(self) -> Generator[str]:
2021-09-27 14:02:44 -04:00
for a in self._iter():
yield a.__ptrcopy__()
2021-09-27 14:02:44 -04:00
2022-01-24 08:00:24 +01:00
def readlines(self) -> List[str]:
2021-09-27 14:02:44 -04:00
return [l for l in self]
2022-02-16 16:51:16 +01:00
def write(self, s: str):
2021-09-27 14:02:44 -04:00
self._ensure_open()
_C.fwrite(s.ptr, 1, len(s), self.fp)
self._errcheck("error in write")
2022-02-16 16:51:16 +01:00
def __file_write_gen__(self, g: Generator[T], T: type):
2021-09-27 14:02:44 -04:00
for s in g:
self.write(str(s))
2022-01-24 08:00:24 +01:00
def read(self, sz: int) -> str:
2021-09-27 14:02:44 -04:00
self._ensure_open()
buf = Ptr[byte](sz)
ret = _C.fread(buf, 1, sz, self.fp)
self._errcheck("error in read")
return str(buf, ret)
2022-01-24 08:00:24 +01:00
def tell(self) -> int:
2021-09-27 14:02:44 -04:00
ret = _C.ftell(self.fp)
self._errcheck("error in tell")
return ret
2022-02-16 16:51:16 +01:00
def seek(self, offset: int, whence: int):
2021-09-27 14:02:44 -04:00
_C.fseek(self.fp, offset, i32(whence))
self._errcheck("error in seek")
2022-02-16 16:51:16 +01:00
def flush(self):
2021-09-27 14:02:44 -04:00
_C.fflush(self.fp)
2022-02-16 16:51:16 +01:00
def close(self):
2021-09-27 14:02:44 -04:00
if self.fp:
_C.fclose(self.fp)
self.fp = cobj()
if self.buf:
_C.free(self.buf)
self._reset()
2022-02-16 16:51:16 +01:00
def _ensure_open(self):
2021-09-27 14:02:44 -04:00
if not self.fp:
raise IOError("I/O operation on closed file")
2022-02-16 16:51:16 +01:00
def _reset(self):
2021-09-27 14:02:44 -04:00
self.buf = Ptr[byte]()
self.sz = 0
2022-01-24 08:00:24 +01:00
def _iter(self) -> Generator[str]:
2021-09-27 14:02:44 -04:00
self._ensure_open()
while True:
2022-01-24 08:00:24 +01:00
rd = _C.getline(
Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp
)
2021-09-27 14:02:44 -04:00
if rd != -1:
yield str(self.buf, rd)
else:
break
2022-01-24 08:00:24 +01:00
def _iter_trim_newline(self) -> Generator[str]:
2021-09-27 14:02:44 -04:00
self._ensure_open()
while True:
2022-01-24 08:00:24 +01:00
rd = _C.getline(
Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp
)
2021-09-27 14:02:44 -04:00
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
2022-01-24 08:00:24 +01:00
2022-02-16 16:51:16 +01:00
def _gz_errcheck(stream: cobj):
2021-09-27 14:02:44 -04:00
errnum = i32(0)
msg = _C.gzerror(stream, __ptr__(errnum))
if msg and msg[0]:
2022-02-16 16:51:16 +01:00
raise IOError(f"zlib error: {str(msg, _C.strlen(msg))}")
2021-09-27 14:02:44 -04:00
2022-01-24 08:00:24 +01:00
2021-09-27 14:02:44 -04:00
class gzFile:
sz: int
buf: Ptr[byte]
fp: cobj
2022-02-16 16:51:16 +01:00
def __init__(self, fp: cobj):
2021-09-27 14:02:44 -04:00
self.fp = fp
self._reset()
2022-02-16 16:51:16 +01:00
def __init__(self, path: str, mode: str):
2021-09-27 14:02:44 -04:00
self.fp = _C.gzopen(path.c_str(), mode.c_str())
if not self.fp:
2022-02-16 16:51:16 +01:00
raise IOError(f"file {path} could not be opened")
2021-09-27 14:02:44 -04:00
self._reset()
2022-01-24 08:00:24 +01:00
def _getline(self) -> int:
2021-09-27 14:02:44 -04:00
if not self.buf:
self.sz = 128
self.buf = Ptr[byte](self.sz)
offset = 0
while True:
if not _C.gzgets(self.fp, self.buf + offset, i32(self.sz - offset)):
_gz_errcheck(self.fp)
if offset == 0:
return -1
break
offset += _C.strlen(self.buf + offset)
if self.buf[offset - 1] == byte(10): # '\n'
break
self.sz *= 2
self.buf = realloc(self.buf, self.sz)
return offset
2022-01-24 08:00:24 +01:00
def __iter__(self) -> Generator[str]:
2021-09-27 14:02:44 -04:00
for a in self._iter():
yield a.__ptrcopy__()
2021-09-27 14:02:44 -04:00
2022-02-16 16:51:16 +01:00
def __enter__(self):
2021-09-27 14:02:44 -04:00
pass
2022-02-16 16:51:16 +01:00
def __exit__(self):
2021-09-27 14:02:44 -04:00
self.close()
2022-02-16 16:51:16 +01:00
def close(self):
2021-09-27 14:02:44 -04:00
if self.fp:
_C.gzclose(self.fp)
self.fp = cobj()
if self.buf:
free(self.buf)
self._reset()
2022-01-24 08:00:24 +01:00
def readlines(self) -> List[str]:
2021-09-27 14:02:44 -04:00
return [l for l in self]
2022-02-16 16:51:16 +01:00
def write(self, s: str):
2021-09-27 14:02:44 -04:00
self._ensure_open()
_C.gzwrite(self.fp, s.ptr, u32(len(s)))
_gz_errcheck(self.fp)
2022-02-16 16:51:16 +01:00
def __file_write_gen__(self, g: Generator[T], T: type):
2021-09-27 14:02:44 -04:00
for s in g:
self.write(str(s))
2022-01-24 08:00:24 +01:00
def tell(self) -> int:
2021-09-27 14:02:44 -04:00
ret = _C.gztell(self.fp)
_gz_errcheck(self.fp)
return ret
2022-02-16 16:51:16 +01:00
def seek(self, offset: int, whence: int):
2021-09-27 14:02:44 -04:00
_C.gzseek(self.fp, offset, i32(whence))
_gz_errcheck(self.fp)
2022-01-24 08:00:24 +01:00
def _iter(self) -> Generator[str]:
2021-09-27 14:02:44 -04:00
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
yield str(self.buf, rd)
else:
break
2022-01-24 08:00:24 +01:00
def _iter_trim_newline(self) -> Generator[str]:
2021-09-27 14:02:44 -04:00
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
2022-02-16 16:51:16 +01:00
def _ensure_open(self):
2021-09-27 14:02:44 -04:00
if not self.fp:
raise IOError("I/O operation on closed file")
2022-02-16 16:51:16 +01:00
def _reset(self):
2021-09-27 14:02:44 -04:00
self.buf = cobj()
self.sz = 0
2022-01-24 08:00:24 +01:00
def open(path: str, mode: str = "r") -> File:
2021-09-27 14:02:44 -04:00
return File(path, mode)
2022-01-24 08:00:24 +01:00
def gzopen(path: str, mode: str = "r") -> gzFile:
2021-09-27 14:02:44 -04:00
return gzFile(path, mode)
2022-01-24 08:00:24 +01:00
def is_binary(path: str) -> bool:
2022-02-16 16:51:16 +01:00
# https://stackoverflow.com/questions/898669/how-can-i-detect-if-a-file-is-binary-non-text-in-python/7392391#7392391
# Can get both false positive and false negatives, but still is a
# clever approach that works for the large majority of files
2022-01-24 08:00:24 +01:00
textchars = {7, 8, 9, 10, 12, 13, 27} | set(iter(range(0x20, 0x100))) - {0x7F}
2021-09-27 14:02:44 -04:00
with open(path, "rb") as f:
header = f.read(1024)
return any(ord(c) not in textchars for c in header)