File size: 4,344 Bytes
1907275 739daaa 1907275 783b8e4 d5d4a4a 12bdec8 783b8e4 12bdec8 d5d4a4a 783b8e4 2735988 93a6c8c 2735988 12bdec8 8ee9950 951c016 12bdec8 951c016 2735988 951c016 783b8e4 11565c4 8ee9950 ab0fa9f 2735988 1b5e0c1 783b8e4 2735988 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
from collections import Counter
import struct
import re
import os
import subprocess
class Token:
def __init__(self, byte, prev):
self.byte = byte
self.prev = prev
def pack(self):
if not 0 <= ord(self.byte) <= 255:
raise ValueError(f"Byte value is out of range, got {self.byte} ({ord(self.byte)})")
return struct.pack("=B H", ord(self.byte), self.prev)
def __str__(self):
return f"{self.byte}, {self.prev}"
def to_binary(self):
return self.pack()
class Tokenizer:
def __init__(self):
self.vocab = [Token(chr(i), 0) for i in range(256)] # define base vocab from ASCII values
def find(self, byte, prev):
for i in range(prev, self.vocab_size):
token = self.vocab[i]
if token.byte == byte and token.prev == prev:
return i
return 0
def append(self, byte, prev):
token = self.find(byte, prev)
if token:
return token
self.vocab.append(Token(byte, prev))
return self.vocab_size - 1
def encode_one(self, text):
prev = 0
for i in range(len(text)):
byte = text[i]
token = self.find(byte, prev)
if token == 0:
return prev, text[i:]
prev = token
return prev, ''
def encode(self, text):
ids = []
while text:
token, text = self.encode_one(text)
ids.append(token)
return ids
def decode_one(self, token):
text = ""
while token:
text += self.vocab[token].byte
token = self.vocab[token].prev
return text[::-1]
def decode(self, ids):
text = ""
for token in ids:
text += self.decode_one(token)
return text
def add_special(self, text):
#print(f"Encoding string: {text}")
token = ord(text[0])
for byte in text[1:]:
token = self.append(byte, token)
#print(f"Working on byte {byte}")
@property
def vocab_size(self):
return len(self.vocab)
def __str__(self):
return '[' + ', '.join(str(token) for token in self.vocab) + ']'
def to_file(self, file):
with open(file, 'ab') as f:
for token in self.vocab:
f.write(token.to_binary())
def from_file(self, file):
self.clear()
with open(file, 'rb') as f:
while True:
try:
data = f.read(3)
token = Token.from_binary(data)
self.vocab += token
except ValueError:
break
def train(self, text, max_length=32000):
words = text.split()
words = [' ' + ''.join(re.findall(r'\w', word)) for word in words]
words = [word for word in words if len(word) >= 2]
word_freq = Counter(words)
sorted_words = sorted(word_freq, key=lambda x: (-word_freq[x], x))
for word in sorted_words:
if self.vocab_size > max_length:
break
self.add_special(word)
print(f"adding word: {word} | current vocab size: {self.vocab_size} | max length: {max_length}")
# Weak part of the project. Maybe implement a handler?
def c_compile(self, c_dir):
subprocess.run(['make'], cwd=c_dir)
def c_run(self, c_dir, c_data, c_out):
subprocess.run(['./a.out', c_data, c_out], cwd=c_dir)
def load_binary_file(self, file_path):
with open(file_path, 'rb') as file:
data = file.read()
# Assuming uint16_t is 2 bytes long
num_values = len(data) // 2
values = struct.unpack('H' * num_values, data)
return list(values)
def c_encode(self, text):
script_dir = os.path.dirname(__file__)
c_dir = os.path.join(script_dir, 'c_tokenizer/')
c_vocab = c_dir + 'tokenizer.bin'
c_data = c_dir + 'dataset.txt'
c_out = c_dir + 'dataset.bin'
with open(c_data, 'w') as f:
f.write(text)
self.to_file(c_vocab)
self.c_compile(c_dir)
self.c_run(c_dir, c_data, c_out)
ids = self.load_binary_file(c_out)
return ids |