from internal.gc import realloc, free class File: sz: int buf: Ptr[byte] fp: cobj def __init__(self, fp: cobj): self.fp = fp self._reset() def __init__(self, path: str, mode: str): self.fp = _C.fopen(path.c_str(), mode.c_str()) if not self.fp: raise IOError("file " + path + " could not be opened") self._reset() def _errcheck(self, msg: str): err = int(_C.ferror(self.fp)) if err: raise IOError("file I/O error: " + msg) def __enter__(self): pass def __exit__(self): self.close() def __iter__(self): for a in self._iter(): yield a.__ptrcopy__() def readlines(self): return [l for l in self] def write(self, s: str): self._ensure_open() _C.fwrite(s.ptr, 1, len(s), self.fp) self._errcheck("error in write") def __file_write_gen__[T](self, g: Generator[T]): for s in g: self.write(str(s)) def read(self, sz: int): self._ensure_open() buf = Ptr[byte](sz) ret = _C.fread(buf, 1, sz, self.fp) self._errcheck("error in read") return str(buf, ret) def tell(self): ret = _C.ftell(self.fp) self._errcheck("error in tell") return ret def seek(self, offset: int, whence: int): _C.fseek(self.fp, offset, i32(whence)) self._errcheck("error in seek") def flush(self): _C.fflush(self.fp) def close(self): if self.fp: _C.fclose(self.fp) self.fp = cobj() if self.buf: _C.free(self.buf) self._reset() def _ensure_open(self): if not self.fp: raise IOError("I/O operation on closed file") def _reset(self): self.buf = Ptr[byte]() self.sz = 0 def _iter(self): self._ensure_open() while True: rd = _C.getline(Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp) if rd != -1: yield str(self.buf, rd) else: break def _iter_trim_newline(self): self._ensure_open() while True: rd = _C.getline(Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp) if rd != -1: if self.buf[rd - 1] == byte(10): rd -= 1 yield str(self.buf, rd) else: break def _gz_errcheck(stream: cobj): errnum = i32(0) msg = _C.gzerror(stream, __ptr__(errnum)) if msg and msg[0]: raise IOError("zlib error: " + str(msg, _C.strlen(msg))) class gzFile: sz: int buf: Ptr[byte] fp: cobj def __init__(self, fp: cobj): self.fp = fp self._reset() def __init__(self, path: str, mode: str): self.fp = _C.gzopen(path.c_str(), mode.c_str()) if not self.fp: raise IOError("file " + path + " could not be opened") self._reset() def _getline(self): if not self.buf: self.sz = 128 self.buf = Ptr[byte](self.sz) offset = 0 while True: if not _C.gzgets(self.fp, self.buf + offset, i32(self.sz - offset)): _gz_errcheck(self.fp) if offset == 0: return -1 break offset += _C.strlen(self.buf + offset) if self.buf[offset - 1] == byte(10): # '\n' break self.sz *= 2 self.buf = realloc(self.buf, self.sz) return offset def __iter__(self): for a in self._iter(): yield a.__ptrcopy__() def __enter__(self): pass def __exit__(self): self.close() def close(self): if self.fp: _C.gzclose(self.fp) self.fp = cobj() if self.buf: free(self.buf) self._reset() def readlines(self): return [l for l in self] def write(self, s: str): self._ensure_open() _C.gzwrite(self.fp, s.ptr, u32(len(s))) _gz_errcheck(self.fp) def __file_write_gen__[T](self, g: Generator[T]): for s in g: self.write(str(s)) def tell(self): ret = _C.gztell(self.fp) _gz_errcheck(self.fp) return ret def seek(self, offset: int, whence: int): _C.gzseek(self.fp, offset, i32(whence)) _gz_errcheck(self.fp) def _iter(self): self._ensure_open() while True: rd = self._getline() if rd != -1: yield str(self.buf, rd) else: break def _iter_trim_newline(self): self._ensure_open() while True: rd = self._getline() if rd != -1: if self.buf[rd - 1] == byte(10): rd -= 1 yield str(self.buf, rd) else: break def _ensure_open(self): if not self.fp: raise IOError("I/O operation on closed file") def _reset(self): self.buf = cobj() self.sz = 0 def open(path: str, mode: str = "r"): return File(path, mode) def gzopen(path: str, mode: str = "r"): return gzFile(path, mode) def is_binary(path: str): textchars = {7, 8, 9, 10, 12, 13, 27} | set(iter(range(0x20, 0x100))) - {0x7f} with open(path, "rb") as f: header = f.read(1024) return any(ord(c) not in textchars for c in header)