natural-merge-sort/naturalmerge.py

473 lines
14 KiB
Python

import os
import itertools
import math
from random import randint
BUFFER_SIZE = 32
SET_BYTES_SIZE = 15
RECORD_BYTES_SIZE = SET_BYTES_SIZE + 1
BYTES_BUFFER_SIZE = BUFFER_SIZE * RECORD_BYTES_SIZE
class ReadBuffer:
def __init__(self, file_path):
self.read_pos = 0
self.size = BUFFER_SIZE
self.loaded_size = 0
self.buffer = []
self.file_path = file_path
self.file_pos = 0
self.file_size = os.path.getsize(file_path)
self.disk_reads_count = 0
self.load_next()
# None if there is no next record
def read_next(self):
if not self.has_more():
return None
res_record = self.buffer[self.read_pos]
self.read_pos += 1
if self.read_pos == self.size:
self.load_next()
self.read_pos = 0
return res_record
def has_more(self):
return (self.file_pos < self.file_size
or self.read_pos < self.loaded_size)
def peek(self):
if self.read_pos == self.loaded_size:
return None
result = self.buffer[self.read_pos]
return result
def load_next(self):
self.buffer = []
# buffering=0 disables buffering, it is desired, because buffering
# is implemented here, in code
file = open(self.file_path, "rb", buffering=0)
file.seek(self.file_pos)
bytes_to_read = min(
BYTES_BUFFER_SIZE,
self.file_size - self.file_pos
)
temp_buffer = file.read(bytes_to_read)
if len(temp_buffer) % RECORD_BYTES_SIZE != 0:
raise Exception("Read bytes are not multiply of record size")
self.file_pos += bytes_to_read
self.loaded_size = bytes_to_read / RECORD_BYTES_SIZE
temp_ints = list(temp_buffer)
for i in range(len(temp_buffer) // RECORD_BYTES_SIZE):
record_ints = temp_ints[
RECORD_BYTES_SIZE * i:RECORD_BYTES_SIZE * (i + 1)
]
self.buffer.append(Record.load_from_ints(record_ints))
file.close()
self.disk_reads_count += 1
def __iter__(self):
return self
def __next__(self):
next_record = self.read_next()
if next_record is None:
raise StopIteration
return next_record
class WriteBuffer:
def __init__(self, file_path, append_mode=False):
self.write_pos = 0
self.size = BUFFER_SIZE
self.buffer = [None] * BUFFER_SIZE
self.file_path = file_path
if not append_mode and os.path.isfile(file_path):
os.remove(file_path)
self.runs_written = 0
self.last_written = None
self.disk_writes_count = 0
def write_next(self, record):
if record < self.last_written:
self.runs_written += 1
if self.write_pos == self.size:
self.flush()
self.buffer[self.write_pos] = record
self.write_pos += 1
self.last_written = record
def save_next(self):
ints_to_write = []
for record in self.buffer[0:self.write_pos]:
ints_to_write += record.save_to_ints() # type: ignore
file = open(self.file_path, "ab", buffering=0)
file.write(bytearray(ints_to_write))
file.close()
self.disk_writes_count += 1
def flush(self):
if self.write_pos > 0:
self.save_next()
self.write_pos = 0
class Record:
def __init__(self, items):
self.items = items
@staticmethod
def load_from_ints(record_ints):
set_length = record_ints[0]
set_items = record_ints[1:set_length + 1]
return Record(set_items)
def save_to_ints(self):
result = [len(self.items), *self.items]
zeropad = [0] * (RECORD_BYTES_SIZE - len(result))
result += zeropad
return result
def __repr__(self):
return f"Set {sorted(self.items, reverse=True)}"
def __lt__(self, other):
if other is None:
return True
self_items_copy = self.items[:]
other_items_copy = other.items[:]
for item in self_items_copy:
if item in other_items_copy:
self_items_copy.remove(item)
other_items_copy.remove(item)
if len(other_items_copy) == 0:
return False
elif len(self_items_copy) == 0:
return True
s_max = max(self_items_copy)
o_max = max(other_items_copy)
return o_max > s_max
class RunIterator:
def __init__(self, read_buffer):
self.read_buffer = read_buffer
self.current_record = None
self.end_of_run = False
def read_next(self):
if self.end_of_run:
return None
self.current_record = self.read_buffer.read_next()
if self.current_record is None:
return None
next_record = self.read_buffer.peek()
if next_record is not None and next_record < self.current_record:
self.end_of_run = True
return self.current_record
def __iter__(self):
return self
def __next__(self):
res_record = self.read_next()
if res_record is None:
raise StopIteration
return res_record
def print_tape(file_name):
print(f"[ .. ] Tape {file_name}\n")
buffer = ReadBuffer(file_name)
series_count = 0
records_count = 0
while buffer.has_more():
ri = RunIterator(buffer)
for record in ri:
print(record)
records_count += 1
series_count += 1
print("~ series end ~")
print(f"\n[ ^- ] Series count: {series_count}")
print(f"[ ^- ] Records count: {records_count}")
def print_runs(file_name, n):
print(f"Printing first {n} runs from {file_name}")
buff = ReadBuffer(file_name)
for i in range(n):
print(f"\nRun {i}:")
ri = RunIterator(buff)
for record in ri:
print(record)
def runs_count(file_name):
rc = 0
buff = ReadBuffer(file_name)
while buff.has_more():
ri = RunIterator(buff)
for _ in ri:
pass
rc += 1
return rc
# ==============================================================================
def prepare_tapes():
t1_dest = WriteBuffer("fs/t1")
for record in ReadBuffer("fs/start_tape"):
t1_dest.write_next(record)
t1_dest.flush()
if os.path.isfile("fs/t2"):
os.remove("fs/t2")
if os.path.isfile("fs/t3"):
os.remove("fs/t3")
class MetaInfo:
def __init__(self, reads_count, writes_count, runs_count):
self.reads_count = reads_count
self.writes_count = writes_count
self.runs_count = runs_count
def distribute(source_tape_path, first_dest_path, second_dest_path):
t1_buffer = ReadBuffer(source_tape_path)
t2_buffer = WriteBuffer(first_dest_path)
t3_buffer = WriteBuffer(second_dest_path)
last_record = t1_buffer.read_next()
t2_buffer.write_next(last_record)
dest_buffer = t2_buffer
i = 0
for record in t1_buffer:
# Not sorted pair of records
if record < last_record:
# Toggle
if dest_buffer == t2_buffer:
dest_buffer = t3_buffer
else:
dest_buffer = t2_buffer
i += 1
if dest_buffer == t2_buffer:
t2_buffer.write_next(record)
else:
t3_buffer.write_next(record)
last_record = record
t2_buffer.flush()
t3_buffer.flush()
return MetaInfo(t1_buffer.disk_reads_count,
t2_buffer.disk_writes_count + t3_buffer.disk_writes_count,
t2_buffer.runs_written + t3_buffer.runs_written)
# Read runs alternately from t2 and t3 (reading is done 1 record at a time, because we can't read whole run to memory)
# So one run may end and if it happens we need to write remaining records from non-empty run
# If any of t2 or t3 ends then just write all remaining runs from non-empty tape to t1
# For each two runs merge their records creating new run
# Write that run to t1
def merge_runs(rit1, rit2, write_buffer: WriteBuffer):
rit1_curr = rit1.read_next()
rit2_curr = rit2.read_next()
while rit1_curr is not None and rit2_curr is not None:
if rit1_curr < rit2_curr:
write_buffer.write_next(rit1_curr)
rit1_curr = rit1.read_next()
else:
write_buffer.write_next(rit2_curr)
rit2_curr = rit2.read_next()
if rit1_curr is not None:
write_buffer.write_next(rit1_curr)
for r in rit1:
write_buffer.write_next(r)
if rit2_curr is not None:
write_buffer.write_next(rit2_curr)
for r in rit2:
write_buffer.write_next(r)
def merge(first_source_path, second_source_path, dest_tape_path):
t1_buffer = WriteBuffer(dest_tape_path)
t2_buffer = ReadBuffer(first_source_path)
t3_buffer = ReadBuffer(second_source_path)
while t2_buffer.has_more() and t3_buffer.has_more():
merge_runs(RunIterator(t2_buffer), RunIterator(t3_buffer), t1_buffer)
for r in t2_buffer:
t1_buffer.write_next(r)
for r in t3_buffer:
t1_buffer.write_next(r)
t1_buffer.flush()
return MetaInfo(t2_buffer.disk_reads_count + t3_buffer.disk_reads_count,
t1_buffer.disk_writes_count,
t1_buffer.runs_written)
class SortInfo:
def __init__(self, reads_count, writes_count, phases_count):
self.reads_count = reads_count
self.writes_count = writes_count
self.phases_count = phases_count
def tape_sort(tape_path, print_after_phase=False):
runs_written = 0
phases_count = 0
reads_count = 0
writes_count = 0
while runs_written != 1:
dist_info = distribute(tape_path, "fs/t2", "fs/t3")
merge_info = merge("fs/t2", "fs/t3", tape_path)
runs_written = merge_info.runs_count
reads_count += dist_info.reads_count
reads_count += merge_info.reads_count
writes_count += dist_info.writes_count
writes_count += merge_info.writes_count
if print_after_phase:
print(f"[ -v ] Phase {phases_count + 1}")
print_tape(tape_path)
print(f"[ .. ] {runs_written} series remaining")
phases_count += 1
return SortInfo(reads_count, writes_count, phases_count)
help_page = """
help
displays the help page
clear <tape_path>
clears (removes) the tape
genrandom <path_to_tape> <records_count> [options]
adds record_count random records at the end of the tape
if 'o' is provided as an option then tape is overwritten with these
newly generated random records
add <path_to_tape> <numbers, ...>
adds new record (set created from 1 to 15 numbers provided in range 0-255)
at the end of the tape
load <path_to_tape> <path_to_test_file>
adds the records described in test file to the tape
sort <path_to_tape> [options]
sorts the provided tape displaying its contents before and after sorting
when option 'v' is provided then tape will be displayed after each
phase
display <path_to_tape>
displays the tape's records and information how many series and records it
contains. Also when displaying records it is shown when each of series
ends
exit
gracefully exits
"""
print("\nSorting using natural merge method (2+1)")
print("Author: Maciej Krzyżanowski\n")
should_run = True
while should_run:
cmd_line = input("> ")
match cmd_line.split():
case ["clear", tape_path]:
print(f"[ .. ] Clearing a tape {tape_path}")
if os.path.isfile(tape_path):
os.remove(tape_path)
print(f"[ :) ] Cleared the tape")
else:
print(f"[ :( ] There does not exist a tape at provided path")
case ["genrandom", tape_path, number_of_records, *options]:
if "o" in options:
write_buffer = WriteBuffer(tape_path)
else:
write_buffer = WriteBuffer(
tape_path, append_mode=True)
for i in range(int(number_of_records)):
set_length = randint(1, 15)
new_set = []
while len(new_set) != set_length:
new_suggestion = randint(0, 255)
if new_suggestion not in new_set:
new_set.append(new_suggestion)
new_record = Record(new_set)
write_buffer.write_next(new_record)
write_buffer.flush()
print(f"[ :) ] Added {number_of_records} new records to " +
f"tape {tape_path}")
case ["display", tape_path]:
print(f"[ :) ] Displaying tape {tape_path}")
print_tape(tape_path)
case ["add", tape_path, *set_elements]:
if len(set_elements) == 0:
print("[ :( ] No number was given")
continue
set_elements = [int(x) for x in set_elements]
new_record = Record(set_elements)
write_buffer = WriteBuffer(tape_path, append_mode=True)
write_buffer.write_next(new_record)
write_buffer.flush()
print(f"[ :) ] Dopisano podany rekord na taśmę")
case ["sort", tape_path, *options]:
print(f"[ .. ] Sorting tape {tape_path}")
print(f"[ -v ] Displaying tape before sorting:")
print_tape(tape_path)
if "v" in options:
sort_info = tape_sort(tape_path, print_after_phase=True)
else:
sort_info = tape_sort(tape_path)
print(f"[ -v ] Displaying tape after sorting:")
print_tape(tape_path)
print(f"[ :) ] Tape {tape_path} sorted!")
print(f"[ -v ] Sorting metadata:")
print(f"[ .. ] Phase count: {sort_info.phases_count}")
print(f"[ .. ] Reads count: {sort_info.reads_count}")
print(f"[ .. ] Writes count: {sort_info.writes_count}")
case ["load", tape_path, test_file_path]:
wb = WriteBuffer(tape_path, append_mode=True)
count = 0
with open(test_file_path) as test_file:
for line in test_file:
set_numbers = [int(s) for s in line.rstrip().split()]
new_record = Record(set_numbers)
wb.write_next(new_record)
count += 1
wb.flush()
print(f"[ :) ] Added {count} records to tape")
case ["help"]:
print(help_page)
case ["exit"]:
print("[ :) ] Goodbye!")
should_run = False
case _:
print("[ :( ] I don't know such a command")