最近需要处理车辆轨迹数据,数据源是
http://kolntrace.project.citi-lab.fr/ ,得到的CSV文件大小是18.9GB,大约有3亿5千万条记录。
现在的处理方式是读取源文件的数据,进行处理后将结果保存到另一个文件中。现在有四种不同的方式。
方式一
def init_csv(filename): csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed' with open(filename, 'a+', encoding="utf-8") as f: f.writelines(csv_title) f.writelines('\n') def write_to_csv(filename, vehicleID, time, x_coordinates, y_coordinates, speed): with open(filename, 'a+', encoding="utf-8") as f: f.writelines(str(vehicleID) +',' + str(time) + ','+ str(x_coordinates) + ','+ str(y_coordinates) + ',' + str(speed)) f.writelines('\n') def process_line(line, i, csvfilename): if i <= 50: print(line + '\n') info = line.split() time = info[0] id = info[1] x_coordinates = info[2] y_coordinates = info[3] speed = info[4] if id.isdigit(): pass else: write_to_csv(csvfilename, id, time, x_coordinates, y_coordinates, speed) def read_file_to_csv(filename, csvfilename): init_csv(csvfilename) with open(filename, 'r', encoding='utf-8') as f: i = 1 for line in f: process_line(line, i, csvfilename) i = i + 1 def main(): ORIGIN_FILE_NAME = '../../koln.tr/koln.tr' CSV_FILE_NAME = 'koln.csv' read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME) if __name__ == '__main__': main()
第一种方式是我一开始使用,使用了很多子函数,逻辑清晰,但是效率极低,如果要处理完18.9G数据,时间估算是大约需要12天。
方式二
def read_file_to_csv(filename, csvfilename): csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed' with open(csvfilename, 'a+', encoding="utf-8") as csvf: csvf.writelines(csv_title) csvf.writelines('\n') with open(filename, 'r', encoding='utf-8') as f: for line in f: info = line.split() time = info[0] id = info[1] x_coordinates = info[2] y_coordinates = info[3] speed = info[4] if float(speed) != 0: if id.isdigit(): linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed) if linedata.count(',') == 4: csvf.writelines(linedata) csvf.writelines('\n') if __name__ == '__main__': ORIGIN_FILE_NAME = '../../koln.tr/koln.tr' CSV_FILE_NAME = '../../koln.tr/readfiledata.csv' read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)
第二种方式是将第一种方式进行改进,去掉了子函数,而且把文件的打开操作作为一步执行,大大减少了文件IO与函数调用的时间,但由于仍然是单进程在执行,所以时间也不是很短,处理完18.9G数据,大概需要3个小时。
方式三
import multiprocessing as mp import os ORIGIN_FILE_NAME = '../../koln.tr/koln.tr' CSV_FILE_NAME = '../../koln.tr/data.csv' global csvf csvf = open(CSV_FILE_NAME, 'a+', encoding="utf-8") def is_number(n): try: num = float(n) # 检查 "nan" is_number = num == num # 或者使用 `math.isnan(num)` except ValueError: is_number = False return is_number def not_comma_in(n): s = str(n) if s.find(',') == -1: return True else: return False def process_wrapper(chunkStart, chunkSize): with open(ORIGIN_FILE_NAME) as f: f.seek(chunkStart) lines = f.read(chunkSize).splitlines() for line in lines: info = line.split() time = info[0] id = info[1] x_coordinates = info[2] y_coordinates = info[3] speed = info[4] if is_number(id) and is_number(time) and is_number(x_coordinates) and is_number(y_coordinates) and is_number(speed)\ and not_comma_in(id) and not_comma_in(time) and not_comma_in(x_coordinates) and not_comma_in(y_coordinates) and not_comma_in(speed): linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed) datas = linedata.split(',') if 5 == len(datas): csvf.writelines(linedata) csvf.writelines('\n') def chunkify(fname, size=6138*10240): fileEnd = os.path.getsize(fname) with open(fname,'rb') as f: chunkEnd = f.tell() while True: chunkStart = chunkEnd f.seek(size,1) f.readline() chunkEnd = f.tell() yield chunkStart, chunkEnd - chunkStart if chunkEnd > fileEnd: break def main(): # init objects pool = mp.Pool(processes=60) jobs = [] # create jobs csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed' csvf.writelines(csv_title) csvf.writelines('\n') for chunkStart, chunkSize in chunkify(ORIGIN_FILE_NAME): jobs.append(pool.apply_async(process_wrapper, (chunkStart, chunkSize))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() if __name__ == '__main__': main()
第三种方式使用了多进程,所以需要性能比较好的计算机才能运行,这也使得数据处理非常快速,处理完18.9G数据只需要不到10分钟。但是,这也导致了一个问题,由于它是将文件分块进行处理,导致了很多数据在分块的时候在每一块中变成了错误的数据。
方式四
import pandas as pd import multiprocessing as mp CSV_FILE_NAME = 'withspeed.csv' TRACE_FILE = 'trace' def process_wrapper(chunk, chunk_num): trace_num = 1 with open(TRACE_FILE + '_' + str(chunk_num) + '.csv', 'a+', encoding='utf-8') as f: f.write('traceID,time,x,y') f.write('\n') vehicleID = chunk['vehicleID'].drop_duplicates() print(len(vehicleID)) for id in vehicleID: trace = chunk[chunk['vehicleID'] == id].sort_values(by=['time']) if len(trace) >= 60: x = trace['x_coordinates'] y = trace['y_coordinates'] time = trace['time'] for i in range(len(trace)): if i + 1 < len(trace): # is not over bound if time.values[i+1] == time.values[i] + 1: f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i])) f.write('\n') elif time.values[i+1] - time.values[i] >= 30: trace_num += 1 # f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i])) # f.write('\n') else: pass trace_num += 1 f.close() def main(): # init objects pool = mp.Pool(processes=20) jobs = [] chunk_size = 5 ** 10 chunk_num = 0 for chunk in pd.read_csv(CSV_FILE_NAME, error_bad_lines=False, chunksize=chunk_size): jobs.append(pool.apply_async(process_wrapper, (chunk, chunk_num))) chunk_num += 1 # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() if __name__ == '__main__': main()
方式四是最终采用的方式,先使用panpads 读取CSV文件,就可以无损地对CSV文件进行分块,即通过行数进行分块,并将各个分块的结果保存到不同的文件中,这样必须是分块数据具有一定的独立性。最终的解决方法可以保证在数据正确性的前提下处理速度也是很快的。