File size: 19,037 Bytes
fd09229
d958a06
 
 
 
 
 
f0a45ec
d958a06
be449ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d958a06
 
 
 
 
 
be449ca
d958a06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fd09229
d958a06
 
 
fd09229
d958a06
 
 
 
 
 
 
f0a45ec
d958a06
 
 
 
 
 
 
f0a45ec
d958a06
 
 
 
 
 
 
 
 
 
 
 
be449ca
d958a06
 
 
 
 
571cb14
 
 
 
 
d958a06
 
 
 
 
 
 
 
f0a45ec
d958a06
 
 
 
 
 
 
 
 
 
 
 
fd09229
d958a06
 
fd09229
 
 
 
 
d958a06
 
 
571cb14
d958a06
 
 
 
1d4a25c
 
d958a06
f0a45ec
 
 
 
d958a06
 
 
 
 
 
f0a45ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d958a06
b6645f5
d958a06
be449ca
b6645f5
 
 
 
be449ca
b6645f5
 
 
 
 
 
 
 
be449ca
b6645f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be449ca
 
b6645f5
be449ca
 
b6645f5
 
 
 
 
be449ca
 
b6645f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be449ca
 
b6645f5
1d4a25c
 
be449ca
1d4a25c
 
 
be449ca
 
1d4a25c
 
be449ca
b6645f5
 
 
be449ca
1d4a25c
b6645f5
 
 
 
 
1d4a25c
 
be449ca
b6645f5
 
1d4a25c
 
be449ca
b6645f5
 
1d4a25c
 
be449ca
b6645f5
 
1d4a25c
 
be449ca
b6645f5
1d4a25c
 
be449ca
b6645f5
1d4a25c
d958a06
1d4a25c
 
 
 
 
 
 
77cc30a
 
1d4a25c
 
 
d958a06
1d4a25c
 
 
d958a06
77cc30a
 
ddca90f
77cc30a
 
 
ddca90f
77cc30a
 
ddca90f
77cc30a
 
 
 
 
 
ddca90f
1d4a25c
 
 
77cc30a
 
ddca90f
77cc30a
 
 
 
 
 
 
 
1d4a25c
77cc30a
1d4a25c
 
 
 
77cc30a
 
1d4a25c
 
77cc30a
1d4a25c
77cc30a
 
1d4a25c
 
 
 
 
 
 
77cc30a
d958a06
1d4a25c
 
d958a06
77cc30a
 
ddca90f
77cc30a
 
 
 
 
ddca90f
77cc30a
 
 
ddca90f
77cc30a
 
 
d958a06
1d4a25c
d958a06
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
from transformers import AutoProcessor, AutoModelForImageTextToText
from PIL import Image
import torch
import logging
from typing import Union, Tuple
from config import Config
from knowledge_base import GarbageClassificationKnowledge
import re


def preprocess_image(image: Image.Image) -> Image.Image:
    """
    Preprocess image to meet Gemma3n requirements (512x512)
    """
    # Convert to RGB if necessary
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Resize to 512x512 as required by Gemma3n
    target_size = (512, 512)

    # Calculate aspect ratio preserving resize
    original_width, original_height = image.size
    aspect_ratio = original_width / original_height

    if aspect_ratio > 1:
        # Width is larger
        new_width = target_size[0]
        new_height = int(target_size[0] / aspect_ratio)
    else:
        # Height is larger or equal
        new_height = target_size[1]
        new_width = int(target_size[1] * aspect_ratio)

    # Resize image maintaining aspect ratio
    image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)

    # Create a new image with target size and paste the resized image
    processed_image = Image.new(
        "RGB", target_size, (255, 255, 255)
    )  # White background

    # Calculate position to center the image
    x_offset = (target_size[0] - new_width) // 2
    y_offset = (target_size[1] - new_height) // 2

    processed_image.paste(image, (x_offset, y_offset))

    return processed_image


class GarbageClassifier:
    def __init__(self, config: Config = None):
        self.config = config or Config()
        self.knowledge = GarbageClassificationKnowledge()
        self.processor = None
        self.model = None
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def load_model(self):
        """Load the model and processor"""
        try:
            self.logger.info(f"Loading model: {self.config.MODEL_NAME}")

            # Load processor
            kwargs = {}
            if self.config.HF_TOKEN:
                kwargs["token"] = self.config.HF_TOKEN

            self.processor = AutoProcessor.from_pretrained(
                self.config.MODEL_NAME, **kwargs
            )

            # Load model
            self.model = AutoModelForImageTextToText.from_pretrained(
                self.config.MODEL_NAME,
                torch_dtype=self.config.TORCH_DTYPE,
                device_map=self.config.DEVICE_MAP,
            )

            self.logger.info("Model loaded successfully")

        except Exception as e:
            self.logger.error(f"Error loading model: {str(e)}")
            raise

    def classify_image(self, image: Union[str, Image.Image]) -> Tuple[str, str, int]:
        """
        Classify garbage in the image

        Args:
            image: PIL Image or path to image file

        Returns:
            Tuple of (classification_result, detailed_analysis, confidence_score)
        """
        if self.model is None or self.processor is None:
            raise RuntimeError("Model not loaded. Call load_model() first.")

        try:
            # Load and process image
            if isinstance(image, str):
                image = Image.open(image)
            elif not isinstance(image, Image.Image):
                raise ValueError("Image must be a PIL Image or file path")

            # Preprocess image to meet Gemma3n requirements
            processed_image = preprocess_image(image)

            # Prepare messages with system prompt and user query
            messages = [
                {
                    "role": "system",
                    "content": [
                        {
                            "type": "text",
                            "text": self.knowledge.get_system_prompt(),
                        }
                    ],
                },
                {
                    "role": "user",
                    "content": [
                        {"type": "image", "image": processed_image},
                        {
                            "type": "text",
                            "text": "Please classify what you see in this image. If it shows garbage/waste items, classify them according to the garbage classification standards. If it shows people, living things, or other non-waste items, classify it as 'Unable to classify' and explain why it's not garbage. Also provide a confidence score from 1-10 indicating how certain you are about your classification.",
                        },
                    ],
                },
            ]

            # Apply chat template and tokenize
            inputs = self.processor.apply_chat_template(
                messages,
                add_generation_prompt=True,
                tokenize=True,
                return_dict=True,
                return_tensors="pt",
            ).to(self.model.device, dtype=self.model.dtype)
            input_len = inputs["input_ids"].shape[-1]

            outputs = self.model.generate(
                **inputs,
                max_new_tokens=self.config.MAX_NEW_TOKENS,
                disable_compile=True,
            )
            response = self.processor.batch_decode(
                outputs[:, input_len:],
                skip_special_tokens=True,
            )[0]

            # Extract classification from response
            classification = self._extract_classification(response)

            # Extract reasoning from response
            reasoning = self._extract_reasoning(response)

            # Extract confidence score from response
            confidence_score = self._extract_confidence_score(response, classification)

            return classification, reasoning, confidence_score

        except Exception as e:
            self.logger.error(f"Error during classification: {str(e)}")
            import traceback

            traceback.print_exc()
            return "Error", f"Classification failed: {str(e)}", 0


    def _calculate_confidence_heuristic(self, response_lower: str, classification: str) -> int:
        """Calculate confidence based on response content and classification type"""
        base_confidence = 5

        # Confidence indicators (increase confidence)
        high_confidence_words = ["clearly", "obviously", "definitely", "certainly", "exactly"]
        medium_confidence_words = ["appears", "seems", "likely", "probably"]

        # Uncertainty indicators (decrease confidence)
        uncertainty_words = ["might", "could", "possibly", "maybe", "unclear", "difficult"]

        # Adjust based on confidence words
        for word in high_confidence_words:
            if word in response_lower:
                base_confidence += 2
                break

        for word in medium_confidence_words:
            if word in response_lower:
                base_confidence += 1
                break

        for word in uncertainty_words:
            if word in response_lower:
                base_confidence -= 2
                break

        # Classification-specific adjustments
        if classification == "Unable to classify":
            if any(indicator in response_lower for indicator in ["person", "people", "human", "living"]):
                base_confidence += 1  # High confidence when clearly not waste
            else:
                base_confidence -= 1  # Lower confidence for unclear items

        elif classification == "Error":
            base_confidence = 1

        else:
            # Check for specific material mentions (increases confidence)
            specific_materials = ["aluminum", "plastic", "glass", "metal", "cardboard", "paper"]
            if any(material in response_lower for material in specific_materials):
                base_confidence += 1

        return min(max(base_confidence, 1), 10)

    def _extract_confidence_score(self, response: str, classification: str) -> int:
        """Extract confidence score from response or calculate based on classification"""
        response_lower = response.lower()

        # Look for explicit confidence scores in the response
        confidence_patterns = [
            r'confidence[:\s]*(\d+)',
            r'confident[:\s]*(\d+)',
            r'certainty[:\s]*(\d+)',
            r'score[:\s]*(\d+)',
            r'(\d+)/10',
            r'(\d+)\s*out\s*of\s*10'
        ]

        for pattern in confidence_patterns:
            match = re.search(pattern, response_lower)
            if match:
                score = int(match.group(1))
                return min(max(score, 1), 10)  # Clamp between 1-10

        # If no explicit score found, calculate based on classification indicators
        return self._calculate_confidence_heuristic(response_lower, classification)

    def _extract_classification(self, response: str) -> str:
        """Extract the main classification from the response with STRICT mixed garbage enforcement"""
        response_lower = response.lower()

        # STRICT MIXED GARBAGE ENFORCEMENT - Catch ANY mixed scenario

        # 1. Explicit mixed garbage phrases
        explicit_mixed_phrases = [
            "multiple garbage types",
            "multiple different",
            "different types of garbage",
            "various items",
            "mixed items",
            "several different",
            "collection of mixed items",
            "mixture of items",
            "variety of items",
            "separate items",
            "please separate"
        ]

        if any(phrase in response_lower for phrase in explicit_mixed_phrases):
            return "Unable to classify"

        # 2. Language patterns that indicate multiple items/uncertainty about classification
        uncertainty_patterns = [
            "appears to be containers",
            "what appears to be",
            "including what appears",
            "various colors and textures",
            "don't clearly fall into a single",
            "without further detail",
            "not possible to definitively classify",
            "more information",
            "can't determine",
            "difficult to identify",
            "unclear category",
            "mixed materials"
        ]

        if any(pattern in response_lower for pattern in uncertainty_patterns):
            return "Unable to classify"

        # 3. Multiple container/item indicators
        multiple_item_indicators = [
            "containers (", "bottles, cans", "bags, and", "items, including",
            "bottles and", "cans and", "containers and", "bags and",
            "plastic bottles, cans", "various containers"
        ]

        if any(indicator in response_lower for indicator in multiple_item_indicators):
            return "Unable to classify"

        # 4. Count different item types mentioned
        item_types = [
            "bottle", "can", "container", "bag", "box", "wrapper",
            "jar", "cup", "plate", "bowl", "package"
        ]

        item_count = sum(1 for item_type in item_types if item_type in response_lower)
        if item_count >= 3:  # If 3+ different container types mentioned, it's mixed
            return "Unable to classify"

        # ONLY EXCEPTION: Single recyclable container with visible food content
        recyclable_container_indicators = ["container", "bottle", "can", "jar", "box", "wrapper"]
        food_content_indicators = [
            "food residue", "food content", "food inside", "visible food",
            "remains", "leftovers", "scraps inside", "not empty", "not rinsed"
        ]
        recyclable_material_indicators = ["plastic", "aluminum", "glass", "metal", "cardboard"]

        # Check for recycling tip warning
        has_recycling_tip = any(tip in response_lower for tip in [
            "tip: empty and rinse",
            "empty and rinse this container",
            "clean first", "rinse first"
        ])

        # ONLY allow Food/Kitchen classification for single contaminated container
        has_single_container = any(indicator in response_lower for indicator in recyclable_container_indicators)
        has_food_content = any(indicator in response_lower for indicator in food_content_indicators)
        has_recyclable_material = any(indicator in response_lower for indicator in recyclable_material_indicators)

        # Must be single item (not multiple) and contaminated
        if (has_single_container and has_food_content and
                (has_recyclable_material or has_recycling_tip) and
                item_count <= 1):  # Only single container
            return "Food/Kitchen Waste"

        # Now proceed with normal classification for single, clear items
        categories = self.knowledge.get_categories()
        waste_categories = [cat for cat in categories if cat != "Unable to classify"]

        for category in waste_categories:
            if category.lower() in response_lower:
                category_index = response_lower.find(category.lower())
                context_before = response_lower[max(0, category_index - 30):category_index]

                if not any(neg in context_before[-10:] for neg in ["not", "cannot", "isn't", "doesn't"]):
                    return category

        # Single item material detection
        recyclable_indicators = ["recyclable", "recycle", "aluminum", "plastic", "glass", "metal", "foil", "cardboard",
                                 "paper"]

        if any(indicator in response_lower for indicator in recyclable_indicators):
            if not any(cont in response_lower for cont in food_content_indicators):
                return "Recyclable Waste"

        # Food waste indicators
        food_indicators = ["food", "fruit", "vegetable", "organic", "kitchen waste", "peel", "core", "scraps"]
        if any(indicator in response_lower for indicator in food_indicators):
            return "Food/Kitchen Waste"

        # Hazardous waste indicators
        hazardous_indicators = ["battery", "chemical", "medicine", "paint", "toxic", "hazardous"]
        if any(indicator in response_lower for indicator in hazardous_indicators):
            return "Hazardous Waste"

        # Other waste indicators
        other_waste_indicators = ["cigarette", "ceramic", "dust", "diaper", "tissue"]
        if any(indicator in response_lower for indicator in other_waste_indicators):
            return "Other Waste"

        # Non-garbage detection
        unable_phrases = ["unable to classify", "cannot classify", "not garbage", "not waste"]
        if any(phrase in response_lower for phrase in unable_phrases):
            return "Unable to classify"

        non_garbage_indicators = ["person", "people", "human", "face", "living", "animal", "pet"]
        if any(indicator in response_lower for indicator in non_garbage_indicators):
            return "Unable to classify"

        # Default fallback
        return "Unable to classify"

    def _extract_reasoning(self, response: str) -> str:
        """Extract only the reasoning content, removing all formatting markers and classification info"""
        import re

        # Remove all formatting markers
        cleaned_response = response.replace("**Classification**:", "")
        cleaned_response = cleaned_response.replace("**Reasoning**:", "")
        cleaned_response = re.sub(r'\*\*.*?\*\*:', '', cleaned_response)  # Remove any **text**: patterns
        cleaned_response = cleaned_response.replace("**", "")  # Remove remaining ** markers

        # Remove category names that might appear at the beginning
        categories = self.knowledge.get_categories()
        for category in categories:
            if cleaned_response.strip().startswith(category):
                cleaned_response = cleaned_response.replace(category, "", 1)
                break

        # Remove common material names that might appear at the beginning
        material_names = [
            "Glass", "Plastic", "Metal", "Paper", "Cardboard", "Aluminum",
            "Steel", "Iron", "Tin", "Foil", "Wood", "Ceramic", "Fabric",
            "Recyclable Waste", "Food/Kitchen Waste", "Hazardous Waste", "Other Waste"
        ]

        # Clean the response
        cleaned_response = cleaned_response.strip()

        # Remove material names at the beginning
        for material in material_names:
            if cleaned_response.startswith(material):
                # Remove the material name and any following punctuation/whitespace
                cleaned_response = cleaned_response[len(material):].lstrip(" .,;:")
                break

        # Split into sentences and clean up
        sentences = []

        # Split by common sentence endings, but keep the endings
        parts = re.split(r'([.!?])\s+', cleaned_response)

        # Rejoin parts to maintain sentence structure
        reconstructed_parts = []
        for i in range(0, len(parts), 2):
            if i < len(parts):
                sentence = parts[i]
                if i + 1 < len(parts):
                    sentence += parts[i + 1]  # Add the punctuation back
                reconstructed_parts.append(sentence)

        for part in reconstructed_parts:
            part = part.strip()
            if not part:
                continue

            # Skip parts that are just category names or material names
            if part in categories or part.rstrip(".,;:") in material_names:
                continue

            # Skip parts that start with category names or material names
            is_category_line = False
            for item in categories + material_names:
                if part.startswith(item):
                    is_category_line = True
                    break

            if is_category_line:
                continue

            # Clean up the sentence
            part = re.sub(r'^[A-Za-z\s]+:', '', part).strip()  # Remove "Category:" type prefixes

            if part and len(part) > 3:  # Only keep meaningful content
                sentences.append(part)

        # Join sentences
        reasoning = ' '.join(sentences)

        # Final cleanup - remove any remaining standalone material words at the beginning
        reasoning_words = reasoning.split()
        if reasoning_words and reasoning_words[0] in [m.lower() for m in material_names]:
            reasoning_words = reasoning_words[1:]
            reasoning = ' '.join(reasoning_words)

        # Ensure proper capitalization
        if reasoning:
            reasoning = reasoning[0].upper() + reasoning[1:] if len(reasoning) > 1 else reasoning.upper()

            # Ensure proper punctuation
            if not reasoning.endswith(('.', '!', '?')):
                reasoning += '.'

        return reasoning if reasoning else "Analysis not available"

    def get_categories_info(self):
        """Get information about all categories"""
        return self.knowledge.get_category_descriptions()