from flask import Flask, request, jsonify from minio import Minio from minio.error import (ResponseError, BucketAlreadyOwnedByYou, BucketAlreadyExists) import os import uuid import pickle import face_recognition from werkzeug.datastructures import ImmutableMultiDict import base64 import cv2 import numpy as np import imutils app = Flask(__name__) minioClient = Minio('minio:9000', access_key=os.environ['s3-name'], secret_key=os.environ['s3-password'], secure=False) @app.route('/') def hello_docker(): return 'Hello, I run in a docker container' @app.route('/acces_minio') def access_minio(): for bucket in minioClient.list_buckets(): print(bucket.name, bucket.creation_date, flush=True) return('Connection Succesfull') @app.route('/new_user_id') def new_user_id(): #demo objekt speichern um id zu 'speichern'. Später mit if (type(object)==list oä überschreiben mit face_encoding.pkl id = None for limited_try in range(0,5): id = str(uuid.uuid4()) if check_id(id) == False: break demo_object = ['test'] with open('/tmp/demo_object.pkl', 'wb') as f: pickle.dump(demo_object, f) minioClient.fput_object('users', str(id), '/tmp/demo_object.pkl') return jsonify({'id':id}) @app.route('/init_face') #call like https://face.sguba.de/init_face?id=123&encoded_string=abc def new_face(): id = request.args.get('id', None) img = request.args.get('encoded_string', None) temporary_img_path = '/tmp/'+str(id)+'.jpg' temporary_pkl_path = '/tmp/'+str(id)+'.pkl' save_img(img, temporary_img_path) face_encoding_response = encode_face(temporary_img_path) if face_encoding_response['success']==True: with open(temporary_pkl_path, 'wb') as file: pickle.dump(face_encoding_response['encoding'], file) minioClient.fput_object('users', str(id), temporary_pkl_path) return jsonify({'success':face_encoding_response['success']}) @app.route('/check_face') #call like https://face.sguba.de/check_face?id=123&encoded_string=abc def check_face(): id = request.args.get('id', None) img = request.args.get('encoded_string', None) temporary_img_path = '/tmp/'+str(id)+'.jpg' temporary_pkl_path = '/tmp/'+str(id)+'.pkl' save_img(img, temporary_img_path) face_encoding_response = encode_face(temporary_img_path) if face_encoding_response['success']==True: minioClient.fget_object('users', str(id), temporary_pkl_path) with open(temporary_pkl_path, 'rb') as file: face_encoding = pickle.load(file) face_encoding_response['result'] = str(face_recognition.compare_faces([face_encoding], face_encoding_response['encoding'])[0]) else: face_encoding_response['result'] = str(False) return jsonify({'result':face_encoding_response['result'],'success':face_encoding_response['success']}) def check_id(id): #return True -> id bereits verwendet #return False -> id noch nicht verwendet users = minioClient.list_objects('users') known = False for user in users: if id == user.object_name: known = True else: pass return known def save_img(encoded_data, filename): encoded_data += "=" * ((4 - len(encoded_data) % 4) % 4) encoded_data = bytes(encoded_data, encoding='utf-8') nparr = np.fromstring(base64.urlsafe_b64decode(encoded_data), np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_ANYCOLOR) img = imutils.rotate(img, 90) return cv2.imwrite(filename, img) def encode_face(path): success = None face = face_recognition.load_image_file(path) face_encoding = face_recognition.face_encodings(face) if len(face_encoding)==0: success = False response = {'success':success, 'encoding':None} else: success = True response = {'success':success, 'encoding':face_encoding[0]} return response def setup(): try: minioClient.make_bucket("users") except BucketAlreadyOwnedByYou as err: pass except BucketAlreadyExists as err: pass except ResponseError as err: raise if __name__ == '__main__': setup() app.run(host='0.0.0.0')