File size: 6,173 Bytes
2a285f8
 
 
 
 
 
 
 
 
 
 
 
 
dcd93f5
 
2a285f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcd93f5
 
 
 
 
2a285f8
dcd93f5
 
2a285f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcd93f5
 
2a285f8
dcd93f5
 
2a285f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from abc import ABC, abstractmethod
import os
import tensorflow as tf
import numpy as np
from transformers import TFCamembertForTokenClassification, CamembertTokenizerFast
import pandas as pd
from .data_processing import (
    process_sentence,
    encode_and_pad_sentence,
    encode_and_pad_sentence_pos,
)
from .metrics import masked_loss, masked_accuracy, entity_accuracy
import stanza
from .models_definitions.bilstm.architecture import BiLSTM
from .models_definitions.lstm_with_pos.architecture import LSTM

nlp = stanza.Pipeline("fr", processors="tokenize,pos")


class NERModel(ABC):
    file_path = os.path.dirname(os.path.abspath(__file__))
    vocab_path = os.path.join(file_path, "vocab.pkl")
    pos_tags_path = os.path.join(file_path, "pos_tags.pkl")
    vocab = pd.read_pickle(vocab_path)
    pos_tags = pd.read_pickle(pos_tags_path)

    @abstractmethod
    def get_entities(self, text: str):
        pass

    @abstractmethod
    def predict(self, text: str):
        pass

    @abstractmethod
    def encode_sentence(self, sentence: str):
        pass


class LSTM_NER(NERModel):
    def __init__(self):
        self.model_weights_path = os.path.join(
            self.file_path,
            "models_definitions",
            "lstm_with_pos",
            "lstm_with_pos.weights.h5",
        )
        self.model = LSTM(self.vocab, 3, self.pos_tags)
        self.model.load_from_weights(self.model_weights_path)

    def encode_sentence(self, sentence: str):
        processed_sentence = process_sentence(
            sentence, stemming=True, return_tokens=True
        )
        encoded_sentence = tf.convert_to_tensor(
            [encode_and_pad_sentence(processed_sentence, self.vocab, max_length=100)]
        )
        sentence_pos = nlp(sentence)
        pos_tags = [word.upos for sent in sentence_pos.sentences for word in sent.words]
        encoded_pos = tf.convert_to_tensor(
            [encode_and_pad_sentence_pos(pos_tags, self.pos_tags, max_length=100)]
        )
        return [encoded_sentence, encoded_pos]

    def get_entities(self, text: str):
        encoded_sentence = self.encode_sentence(text)
        predictions = self.predict(encoded_sentence)
        return predictions[0].numpy()

    def predict(self, encoded_sentence):
        return tf.math.argmax(self.model.predict(encoded_sentence, verbose=0), axis=-1)


class BiLSTM_NER(NERModel):
    def __init__(self):
        self.model_weights_path = os.path.join(
            self.file_path, "models_definitions", "bilstm", "bilstm.weights.h5"
        )
        self.model = BiLSTM(self.vocab, 3)
        self.model.load_from_weights(self.model_weights_path)

    def encode_sentence(self, sentence: str):
        processed_sentence = process_sentence(
            sentence, stemming=True, return_tokens=True
        )
        encoded_sentence = tf.convert_to_tensor(
            [encode_and_pad_sentence(processed_sentence, self.vocab, max_length=100)]
        )
        return encoded_sentence

    def get_entities(self, text: str):
        encoded_sentence = self.encode_sentence(text)
        predictions = self.predict(encoded_sentence)
        return predictions[0].numpy()

    def predict(self, encoded_sentence):
        return tf.math.argmax(self.model.predict(encoded_sentence, verbose=0), axis=-1)


class CamemBERT_NER(NERModel):
    def __init__(self, num_labels=3):
        self.model = TFCamembertForTokenClassification.from_pretrained(
            "Az-r-ow/CamemBERT-NER-Travel", num_labels=num_labels
        )

        self.tokenizer = CamembertTokenizerFast.from_pretrained(
            "cmarkea/distilcamembert-base"
        )

    def encode_sentence(self, sentence: str):
        return self.tokenizer(
            sentence,
            return_tensors="tf",
            padding="max_length",
            max_length=150,
        )

    def get_entities(self, text: str):
        encoded_sentence = self.encode_sentence(text)
        predictions = self.predict(encoded_sentence).logits
        predictions = tf.math.argmax(predictions, axis=-1)[0].numpy()
        return self.align_labels_with_original_sentence(
            encoded_sentence, [predictions]
        )[0]

    def predict(self, encoded_sentence):
        return self.model.predict(encoded_sentence)

    def align_labels_with_original_sentence(self, tokenized_inputs, predictions):
        """
        Aligns predictions from token classification back to the original sentence words.

        Args:
            tokenized_inputs (BatchEncoding): Tokenized input from CamembertTokenizerFast.
            predictions (np.array): Model predictions, shape (batch_size, seq_len, num_labels).

        Returns:
            List[List[str]]: Adjusted labels for each word in the original sentences.
        """
        aligned_labels = []

        for i in range(len(predictions)):  # Iterate through each example in the batch
            word_ids = tokenized_inputs.word_ids(
                batch_index=i
            )  # Get word IDs for this example
            sentence_labels = []
            current_word = None
            word_label = None

            for token_idx, word_idx in enumerate(word_ids):
                # Skip special tokens where word_idx is None
                if word_idx is None:
                    continue

                # If we're at a new word
                if word_idx != current_word:
                    # Append label for the completed word
                    if current_word is not None:
                        sentence_labels.append(word_label)

                    # Reset for the new word
                    current_word = word_idx
                    word_label = predictions[i][token_idx]

                # Handle subwords (optional: take the first or last subword label)
                else:
                    # Here, we take the first subword label; alternatively, update word_label as needed.
                    continue

            # Append the last word's label
            if current_word is not None:
                sentence_labels.append(word_label)

            aligned_labels.append(sentence_labels)

        return aligned_labels