Spaces:
Running
Running
| from flask import Flask, request, jsonify, send_from_directory | |
| from flask_socketio import SocketIO, emit, join_room, leave_room | |
| import json | |
| import os | |
| import random | |
| import secrets | |
| from datetime import datetime | |
| from filelock import FileLock | |
| from functools import wraps | |
| app = Flask(__name__, static_folder='static', template_folder='templates') | |
| app.config['SECRET_KEY'] = os.environ.get('SESSION_SECRET', secrets.token_hex(32)) | |
| socketio = SocketIO(app, cors_allowed_origins="*", async_mode='eventlet') | |
| DATA_DIR = 'data' | |
| USERS_FILE = os.path.join(DATA_DIR, 'users.json') | |
| MESSAGES_FILE = os.path.join(DATA_DIR, 'messages.json') | |
| USERS_LOCK = FileLock(f"{USERS_FILE}.lock") | |
| MESSAGES_LOCK = FileLock(f"{MESSAGES_FILE}.lock") | |
| active_users = {} | |
| def init_data_files(): | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| if not os.path.exists(USERS_FILE): | |
| with USERS_LOCK: | |
| with open(USERS_FILE, 'w') as f: | |
| json.dump({}, f, indent=2) | |
| if not os.path.exists(MESSAGES_FILE): | |
| with MESSAGES_LOCK: | |
| with open(MESSAGES_FILE, 'w') as f: | |
| json.dump([], f, indent=2) | |
| def read_json(file_path, lock): | |
| with lock: | |
| with open(file_path, 'r') as f: | |
| return json.load(f) | |
| def write_json(file_path, data, lock): | |
| with lock: | |
| with open(file_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| def generate_user_id(): | |
| users = read_json(USERS_FILE, USERS_LOCK) | |
| while True: | |
| user_id = str(random.randint(10000000, 99999999)) | |
| if user_id not in users: | |
| return user_id | |
| def generate_token(): | |
| return secrets.token_urlsafe(32) | |
| def generate_message_id(): | |
| return secrets.token_urlsafe(16) | |
| def get_user_by_token(token): | |
| users = read_json(USERS_FILE, USERS_LOCK) | |
| for user_id, user_data in users.items(): | |
| if user_data.get('token') == token: | |
| return user_id, user_data | |
| return None, None | |
| def index(): | |
| return send_from_directory('templates', 'index.html') | |
| def register(): | |
| data = request.json | |
| name = data.get('name', '').strip() | |
| email = data.get('email', '').strip() | |
| if not name or not email: | |
| return jsonify({'error': 'Name and email are required'}), 400 | |
| users = read_json(USERS_FILE, USERS_LOCK) | |
| for user_data in users.values(): | |
| if user_data.get('email') == email: | |
| return jsonify({'error': 'Email already registered'}), 400 | |
| user_id = generate_user_id() | |
| token = generate_token() | |
| users[user_id] = { | |
| 'name': name, | |
| 'email': email, | |
| 'token': token, | |
| 'contacts': [], | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| write_json(USERS_FILE, users, USERS_LOCK) | |
| return jsonify({ | |
| 'user_id': user_id, | |
| 'token': token, | |
| 'name': name, | |
| 'email': email | |
| }) | |
| def login(): | |
| data = request.json | |
| user_id = data.get('user_id', '').strip() | |
| token = data.get('token', '').strip() | |
| if not user_id or not token: | |
| return jsonify({'error': 'User ID and token are required'}), 400 | |
| users = read_json(USERS_FILE, USERS_LOCK) | |
| if user_id not in users: | |
| return jsonify({'error': 'Invalid user ID'}), 401 | |
| user_data = users[user_id] | |
| if user_data.get('token') != token: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| return jsonify({ | |
| 'user_id': user_id, | |
| 'name': user_data['name'], | |
| 'email': user_data['email'], | |
| 'contacts': user_data.get('contacts', []) | |
| }) | |
| def add_contact(): | |
| data = request.json | |
| token = data.get('token', '') | |
| contact_id = data.get('contact_id', '').strip() | |
| user_id, user_data = get_user_by_token(token) | |
| if not user_id: | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| if not contact_id: | |
| return jsonify({'error': 'Contact ID is required'}), 400 | |
| users = read_json(USERS_FILE, USERS_LOCK) | |
| if contact_id not in users: | |
| return jsonify({'error': 'User not found'}), 404 | |
| if contact_id == user_id: | |
| return jsonify({'error': 'Cannot add yourself as contact'}), 400 | |
| if contact_id in user_data.get('contacts', []): | |
| return jsonify({'error': 'Contact already added'}), 400 | |
| users[user_id]['contacts'] = users[user_id].get('contacts', []) + [contact_id] | |
| write_json(USERS_FILE, users, USERS_LOCK) | |
| contact_data = users[contact_id] | |
| return jsonify({ | |
| 'contact_id': contact_id, | |
| 'name': contact_data['name'], | |
| 'email': contact_data['email'] | |
| }) | |
| def get_contacts(): | |
| data = request.json | |
| token = data.get('token', '') | |
| user_id, user_data = get_user_by_token(token) | |
| if not user_id: | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| users = read_json(USERS_FILE, USERS_LOCK) | |
| contacts = [] | |
| for contact_id in user_data.get('contacts', []): | |
| if contact_id in users: | |
| contact = users[contact_id] | |
| contacts.append({ | |
| 'user_id': contact_id, | |
| 'name': contact['name'], | |
| 'email': contact['email'], | |
| 'online': contact_id in active_users | |
| }) | |
| return jsonify({'contacts': contacts}) | |
| def get_messages(): | |
| data = request.json | |
| token = data.get('token', '') | |
| contact_id = data.get('contact_id', '') | |
| user_id, user_data = get_user_by_token(token) | |
| if not user_id: | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| all_messages = read_json(MESSAGES_FILE, MESSAGES_LOCK) | |
| conversation_messages = [ | |
| msg for msg in all_messages | |
| if (msg['from'] == user_id and msg['to'] == contact_id) or | |
| (msg['from'] == contact_id and msg['to'] == user_id) | |
| ] | |
| conversation_messages.sort(key=lambda x: x['timestamp']) | |
| return jsonify({'messages': conversation_messages}) | |
| def handle_connect(): | |
| print(f'Client connected: {request.sid}') | |
| def handle_authenticate(data): | |
| token = data.get('token', '') | |
| user_id, user_data = get_user_by_token(token) | |
| if user_id: | |
| active_users[user_id] = request.sid | |
| join_room(user_id) | |
| socketio.emit('user_status', { | |
| 'user_id': user_id, | |
| 'online': True | |
| }) | |
| emit('authenticated', {'success': True, 'user_id': user_id}) | |
| print(f'User {user_id} ({user_data["name"]}) authenticated') | |
| else: | |
| emit('authenticated', {'success': False, 'error': 'Invalid token'}) | |
| def handle_disconnect(reason=None): | |
| user_id = None | |
| for uid, sid in active_users.items(): | |
| if sid == request.sid: | |
| user_id = uid | |
| break | |
| if user_id: | |
| del active_users[user_id] | |
| socketio.emit('user_status', { | |
| 'user_id': user_id, | |
| 'online': False | |
| }) | |
| print(f'User {user_id} disconnected') | |
| def handle_send_message(data): | |
| token = data.get('token', '') | |
| to_user_id = data.get('to', '') | |
| message_text = data.get('message', '') | |
| user_id, user_data = get_user_by_token(token) | |
| if not user_id: | |
| emit('error', {'message': 'Unauthorized'}) | |
| return | |
| message_id = generate_message_id() | |
| timestamp = datetime.now().isoformat() | |
| message = { | |
| 'message_id': message_id, | |
| 'from': user_id, | |
| 'to': to_user_id, | |
| 'message': message_text, | |
| 'timestamp': timestamp, | |
| 'status': 'sent' | |
| } | |
| messages = read_json(MESSAGES_FILE, MESSAGES_LOCK) | |
| messages.append(message) | |
| write_json(MESSAGES_FILE, messages, MESSAGES_LOCK) | |
| emit('message_sent', message) | |
| if to_user_id in active_users: | |
| message['status'] = 'delivered' | |
| messages = read_json(MESSAGES_FILE, MESSAGES_LOCK) | |
| for msg in messages: | |
| if msg['message_id'] == message_id: | |
| msg['status'] = 'delivered' | |
| break | |
| write_json(MESSAGES_FILE, messages, MESSAGES_LOCK) | |
| socketio.emit('new_message', message, room=to_user_id) | |
| emit('message_status_update', { | |
| 'message_id': message_id, | |
| 'status': 'delivered' | |
| }) | |
| def handle_message_delivered(data): | |
| message_id = data.get('message_id', '') | |
| messages = read_json(MESSAGES_FILE, MESSAGES_LOCK) | |
| for msg in messages: | |
| if msg['message_id'] == message_id and msg['status'] == 'sent': | |
| msg['status'] = 'delivered' | |
| break | |
| write_json(MESSAGES_FILE, messages, MESSAGES_LOCK) | |
| for msg in messages: | |
| if msg['message_id'] == message_id: | |
| from_user_id = msg['from'] | |
| if from_user_id in active_users: | |
| socketio.emit('message_status_update', { | |
| 'message_id': message_id, | |
| 'status': 'delivered' | |
| }, room=from_user_id) | |
| break | |
| def handle_message_read(data): | |
| message_id = data.get('message_id', '') | |
| messages = read_json(MESSAGES_FILE, MESSAGES_LOCK) | |
| for msg in messages: | |
| if msg['message_id'] == message_id: | |
| msg['status'] = 'read' | |
| from_user_id = msg['from'] | |
| break | |
| write_json(MESSAGES_FILE, messages, MESSAGES_LOCK) | |
| if from_user_id in active_users: | |
| socketio.emit('message_status_update', { | |
| 'message_id': message_id, | |
| 'status': 'read' | |
| }, room=from_user_id) | |
| def handle_mark_conversation_read(data): | |
| token = data.get('token', '') | |
| contact_id = data.get('contact_id', '') | |
| user_id, user_data = get_user_by_token(token) | |
| if not user_id: | |
| return | |
| messages = read_json(MESSAGES_FILE, MESSAGES_LOCK) | |
| updated_message_ids = [] | |
| for msg in messages: | |
| if msg['from'] == contact_id and msg['to'] == user_id and msg['status'] != 'read': | |
| msg['status'] = 'read' | |
| updated_message_ids.append(msg['message_id']) | |
| write_json(MESSAGES_FILE, messages, MESSAGES_LOCK) | |
| if contact_id in active_users: | |
| for msg_id in updated_message_ids: | |
| socketio.emit('message_status_update', { | |
| 'message_id': msg_id, | |
| 'status': 'read' | |
| }, room=contact_id) | |
| def handle_typing(data): | |
| token = data.get('token', '') | |
| to_user_id = data.get('to', '') | |
| is_typing = data.get('typing', False) | |
| user_id, user_data = get_user_by_token(token) | |
| if not user_id: | |
| return | |
| if to_user_id in active_users: | |
| socketio.emit('user_typing', { | |
| 'user_id': user_id, | |
| 'typing': is_typing | |
| }, room=to_user_id) | |
| # Hugging Face Spaces compatibility | |
| if __name__ == '__main__': | |
| init_data_files() | |
| print("WhatsApp-like server starting on Hugging Face...") | |
| print("Access the app at https://your-username-your-app-name.hf.space") | |
| socketio.run(app, host='0.0.0.0', port=7860, debug=False) |