|
|
from collections import Counter |
|
|
import struct |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
|
|
|
|
|
|
class Token: |
|
|
def __init__(self, byte, prev): |
|
|
self.byte = byte |
|
|
self.prev = prev |
|
|
|
|
|
|
|
|
def pack(self): |
|
|
if not 0 <= ord(self.byte) <= 255: |
|
|
raise ValueError(f"Byte value is out of range, got {self.byte} ({ord(self.byte)})") |
|
|
|
|
|
return struct.pack("=B H", ord(self.byte), self.prev) |
|
|
|
|
|
|
|
|
def __str__(self): |
|
|
return f"{self.byte}, {self.prev}" |
|
|
|
|
|
def to_binary(self): |
|
|
return self.pack() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Tokenizer: |
|
|
def __init__(self): |
|
|
self.vocab = [Token(chr(i), 0) for i in range(256)] |
|
|
|
|
|
|
|
|
def find(self, byte, prev): |
|
|
for i in range(prev, self.vocab_size): |
|
|
token = self.vocab[i] |
|
|
if token.byte == byte and token.prev == prev: |
|
|
return i |
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
def append(self, byte, prev): |
|
|
token = self.find(byte, prev) |
|
|
if token: |
|
|
return token |
|
|
|
|
|
self.vocab.append(Token(byte, prev)) |
|
|
return self.vocab_size - 1 |
|
|
|
|
|
|
|
|
def encode_one(self, text): |
|
|
prev = 0 |
|
|
|
|
|
for i in range(len(text)): |
|
|
byte = text[i] |
|
|
token = self.find(byte, prev) |
|
|
|
|
|
if token == 0: |
|
|
return prev, text[i:] |
|
|
|
|
|
prev = token |
|
|
|
|
|
return prev, '' |
|
|
|
|
|
|
|
|
def encode(self, text): |
|
|
ids = [] |
|
|
|
|
|
while text: |
|
|
token, text = self.encode_one(text) |
|
|
ids.append(token) |
|
|
|
|
|
return ids |
|
|
|
|
|
|
|
|
def decode_one(self, token): |
|
|
text = "" |
|
|
|
|
|
while token: |
|
|
text += self.vocab[token].byte |
|
|
token = self.vocab[token].prev |
|
|
|
|
|
return text[::-1] |
|
|
|
|
|
|
|
|
def decode(self, ids): |
|
|
text = "" |
|
|
|
|
|
for token in ids: |
|
|
text += self.decode_one(token) |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def add_special(self, text): |
|
|
|
|
|
token = ord(text[0]) |
|
|
for byte in text[1:]: |
|
|
token = self.append(byte, token) |
|
|
|
|
|
|
|
|
|
|
|
@property |
|
|
def vocab_size(self): |
|
|
return len(self.vocab) |
|
|
|
|
|
|
|
|
def __str__(self): |
|
|
return '[' + ', '.join(str(token) for token in self.vocab) + ']' |
|
|
|
|
|
|
|
|
def to_file(self, file): |
|
|
with open(file, 'ab') as f: |
|
|
for token in self.vocab: |
|
|
f.write(token.to_binary()) |
|
|
|
|
|
|
|
|
def from_file(self, file): |
|
|
self.clear() |
|
|
with open(file, 'rb') as f: |
|
|
while True: |
|
|
try: |
|
|
data = f.read(3) |
|
|
token = Token.from_binary(data) |
|
|
self.vocab += token |
|
|
except ValueError: |
|
|
break |
|
|
|
|
|
|
|
|
def train(self, text, max_length=32000): |
|
|
words = text.split() |
|
|
words = [' ' + ''.join(re.findall(r'\w', word)) for word in words] |
|
|
words = [word for word in words if len(word) >= 2] |
|
|
|
|
|
word_freq = Counter(words) |
|
|
sorted_words = sorted(word_freq, key=lambda x: (-word_freq[x], x)) |
|
|
|
|
|
for word in sorted_words: |
|
|
if self.vocab_size > max_length: |
|
|
break |
|
|
|
|
|
self.add_special(word) |
|
|
print(f"adding word: {word} | current vocab size: {self.vocab_size} | max length: {max_length}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def c_compile(self, c_dir): |
|
|
subprocess.run(['make'], cwd=c_dir) |
|
|
|
|
|
|
|
|
def c_run(self, c_dir, c_data, c_out): |
|
|
subprocess.run(['./a.out', c_data, c_out], cwd=c_dir) |
|
|
|
|
|
|
|
|
def load_binary_file(self, file_path): |
|
|
with open(file_path, 'rb') as file: |
|
|
data = file.read() |
|
|
|
|
|
num_values = len(data) // 2 |
|
|
values = struct.unpack('H' * num_values, data) |
|
|
return list(values) |
|
|
|
|
|
|
|
|
def c_encode(self, text): |
|
|
script_dir = os.path.dirname(__file__) |
|
|
c_dir = os.path.join(script_dir, 'c_tokenizer/') |
|
|
|
|
|
c_vocab = c_dir + 'tokenizer.bin' |
|
|
c_data = c_dir + 'dataset.txt' |
|
|
c_out = c_dir + 'dataset.bin' |
|
|
|
|
|
with open(c_data, 'w') as f: |
|
|
f.write(text) |
|
|
|
|
|
|
|
|
self.to_file(c_vocab) |
|
|
self.c_compile(c_dir) |
|
|
self.c_run(c_dir, c_data, c_out) |
|
|
|
|
|
ids = self.load_binary_file(c_out) |
|
|
|
|
|
return ids |