网上找的都不太好
自己写了个 模仿tail
更新了下代码, 发现windows和unix文本区别还是会导致点坑的
-
def get_last_line(file_path, max_line_number, buffer_size=4096):
-
# 从末尾读取文件并返回以行组成的列表
-
"""
-
:param file_path: 文件路径
-
:param max_line_number: 返回行数
-
:param buffer_size: 读缓冲区大小
-
:return: :raise ParameterIllegal: 自定义错误
-
"""
-
if not isinstance(max_line_number, (int, long)):
-
raise ParameterIllegal('max line number not int')
-
if not isinstance(file_path, str):
-
raise ParameterIllegal('file_path value error')
-
if not isinstance(buffer_size, (int, long)):
-
raise ParameterIllegal('buffer_size not int')
-
if not os.path.exists(file_path):
-
raise ParameterIllegal('file_path not exists')
-
if buffer_size > 20000:
-
raise ParameterIllegal('buffer_size large then 20000')
-
# 限制最大读取行数
-
if max_line_number > 100:
-
max_line_number = 100
-
out_put_line_list = []
-
try:
-
# 非unix文件使用r方式打开读取长度会减少,直接以rb方式打开
-
f = open(file_path, 'rb', buffering=buffer_size)
-
except OSError:
-
raise ParameterIllegal('open file get os error')
-
except IOError:
-
raise ParameterIllegal('open file get io error')
-
# 文件大小
-
file_size = os.path.getsize(file_path)
-
if file_size == 0:
-
try:
-
f.close()
-
msg = 'closed file'
-
except OSError:
-
msg = 'close file os error'
-
except IOError:
-
msg = 'close file io error'
-
raise ParameterIllegal('file size is 0 %s' % msg)
-
# 当前位置,即开始读取的位置
-
if file_size < buffer_size:
-
start_read_pos = 0
-
buffer_size = file_size
-
else:
-
start_read_pos = file_size - 1 - buffer_size
-
# 实际读取的buffer大小
-
read_buffer_size = buffer_size
-
# 实际读取到的行数
-
read_line_num = 0
-
# 未满行的字符串
-
line_buffer = ''
-
# 全部读取大小
-
size_read = 0
-
while read_line_num < max_line_number:
-
# 已经是文件的最开始部位,设置退出
-
# 防止读取过多内容,限制为1M
-
size_read += read_buffer_size
-
if size_read >= 1048576:
-
try:
-
f.close()
-
msg = 'closed file'
-
except OSError:
-
msg = 'close file os error'
-
except IOError:
-
msg = 'close file io error'
-
try:
-
f.close()
-
except OSError:
-
raise ParameterIllegal('close file get os error')
-
except IOError:
-
raise ParameterIllegal('close file get io error')
-
raise ParameterIllegal('read file get string to much long %d, %s' % (size_read, msg))
-
# seek到读取位置
-
f.seek(start_read_pos)
-
# 读取文件
-
try:
-
string_buffer = f.read(read_buffer_size)
-
except OSError:
-
try:
-
f.close()
-
msg = 'closed file'
-
except OSError:
-
msg = 'close file os error'
-
except IOError:
-
msg = 'close file io error'
-
raise ParameterIllegal('read file get os error and %s' % msg)
-
except IOError:
-
try:
-
f.close()
-
msg = 'closed file'
-
except OSError:
-
msg = 'close file os error'
-
except IOError:
-
msg = 'close file io error'
-
raise ParameterIllegal('read file get io error and %s' % msg)
-
# 换行位置,默认为string_buffer的结尾
-
new_line_pos = read_buffer_size
-
# xrange倒序
-
for i in xrange(read_buffer_size - 1, -1, -1):
-
if string_buffer[i] == '\n':
-
# 切片添加
-
if len(line_buffer) > 0:
-
out_put_line_list.insert(0, string_buffer[i + 1: new_line_pos] + line_buffer)
-
line_buffer = ''
-
else:
-
if len(string_buffer[i + 1: new_line_pos]) > 0:
-
out_put_line_list.insert(0, string_buffer[i + 1: new_line_pos])
-
read_line_num += 1
-
new_line_pos = i
-
if read_line_num >= max_line_number:
-
break
-
# 已经到当前string buff的第一个字符串
-
if i == 0:
-
if new_line_pos > 0:
-
line_buffer = string_buffer[0: new_line_pos] + line_buffer
-
out_put_line_list.insert(0, line_buffer)
-
# 剩余文件长度大于buffer
-
if start_read_pos > buffer_size - 1: # 重新定位开始读取的位置
-
start_read_pos = start_read_pos - buffer_size
-
# 剩余文件长度小于buffer
-
else:
-
read_buffer_size = buffer_size - start_read_pos # 减少实际读取的buffer
-
start_read_pos = 0 # 读取位置定义到文件头
-
if read_line_num < max_line_number:
-
read_line_num = max_line_number - 1
-
if file_size < buffer_size:
-
read_line_num = max_line_number
-
try:
-
f.close()
-
except OSError:
-
raise ParameterIllegal('close file get os error')
-
except IOError:
-
raise ParameterIllegal('close file get io error')
-
return out_put_line_list
阅读(6602) | 评论(0) | 转发(0) |