leancloud.engine 源代码

# coding: utf-8

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import sys
import json
import warnings

from werkzeug.wrappers import Request
from werkzeug.wrappers import Response
from werkzeug.serving import run_simple

import leancloud
from . import utils
from . import leanengine
from .authorization import AuthorizationMiddleware
from .cookie_session import CookieSessionMiddleware  # noqa: F401
from .cors import CORSMiddleware
from .https_redirect_middleware import HttpsRedirectMiddleware  # noqa: F401
from .leanengine import LeanEngineApplication
from .leanengine import LeanEngineError
from .leanengine import after_delete
from .leanengine import after_save
from .leanengine import after_update
from .leanengine import before_delete
from .leanengine import before_save
from .leanengine import before_update
from .leanengine import context
from .leanengine import current
from .leanengine import register_cloud_func
from .leanengine import register_on_bigquery
from .leanengine import register_on_login
from .leanengine import register_on_auth_data
from .leanengine import register_on_verified
from .leanengine import user

__author__ = "asaka <lan@leancloud.rocks>"


[文档]class Engine(object): """ LeanEngine middleware. """ def __init__(self, wsgi_app=None, fetch_user=True): """ LeanEngine middleware constructor. :param wsgi_app: wsgi callable :param fetch_user: should fetch user's data from server while prNoneocessing session token. :type fetch_user: bool """ self.current = current if wsgi_app: leanengine.root_engine = self self.origin_app = wsgi_app self.app = LeanEngineApplication(fetch_user=fetch_user) self.cloud_app = context.local_manager.make_middleware( CORSMiddleware(AuthorizationMiddleware(self.app)) ) def __call__(self, environ, start_response): request = Request(environ) environ[ "leanengine.request" ] = request # cache werkzeug request for other middlewares if request.path in ("/__engine/1/ping", "/__engine/1.1/ping/"): start_response( utils.to_native("200 OK"), [ ( utils.to_native("Content-Type"), utils.to_native("application/json"), ) ], ) version = sys.version_info return Response( json.dumps( { "version": leancloud.__version__, "runtime": "cpython-{0}.{1}.{2}".format( version.major, version.minor, version.micro ), } ) )(environ, start_response) if request.path.startswith("/__engine/"): return self.cloud_app(environ, start_response) if request.path.startswith("/1/functions") or request.path.startswith( "/1.1/functions" ): return self.cloud_app(environ, start_response) if request.path.startswith("/1/call") or request.path.startswith("/1.1/call"): return self.cloud_app(environ, start_response) return self.origin_app(environ, start_response)
[文档] def wrap(self, wsgi_app): if leanengine.root_engine: raise RuntimeError("It's forbidden that overwriting wsgi_func.") leanengine.root_engine = self self.origin_app = wsgi_app return self
[文档] def register(self, engine): if not isinstance(engine, Engine): raise TypeError("Please specify an Engine instance") self.app.update_cloud_codes(engine.app.cloud_codes)
[文档] def define(self, *args, **kwargs): return register_cloud_func(self.app.cloud_codes, *args, **kwargs)
[文档] def on_verified(self, *args, **kwargs): return register_on_verified(self.app.cloud_codes, *args, **kwargs)
[文档] def on_login(self, *args, **kwargs): return register_on_login(self.app.cloud_codes, *args, **kwargs)
[文档] def on_auth_data(self, *args, **kwargs): return register_on_auth_data(self.app.cloud_codes, *args, **kwargs)
[文档] def on_bigquery(self, *args, **kwargs): warnings.warn( "on_bigquery is deprecated, please use on_insight instead", leancloud.LeanCloudWarning, ) return register_on_bigquery(self.app.cloud_codes, *args, **kwargs)
[文档] def before_save(self, *args, **kwargs): return before_save(self.app.cloud_codes, *args, **kwargs)
[文档] def after_save(self, *args, **kwargs): return after_save(self.app.cloud_codes, *args, **kwargs)
[文档] def before_update(self, *args, **kwargs): return before_update(self.app.cloud_codes, *args, **kwargs)
[文档] def after_update(self, *args, **kwargs): return after_update(self.app.cloud_codes, *args, **kwargs)
[文档] def before_delete(self, *args, **kwargs): return before_delete(self.app.cloud_codes, *args, **kwargs)
[文档] def after_delete(self, *args, **kwargs): return after_delete(self.app.cloud_codes, *args, **kwargs)
[文档] def on_insight(self, *args, **kwargs): return register_on_bigquery(self.app.cloud_codes, *args, **kwargs)
[文档] def run(self, *args, **kwargs): return run_simple(*args, **kwargs)
[文档] def start(self): from gevent.pywsgi import WSGIServer if not hasattr(leancloud, "APP_ID"): APP_ID = os.environ["LEANCLOUD_APP_ID"] APP_KEY = os.environ["LEANCLOUD_APP_KEY"] MASTER_KEY = os.environ["LEANCLOUD_APP_MASTER_KEY"] HOOK_KEY = os.environ["LEANCLOUD_APP_HOOK_KEY"] PORT = int(os.environ.get("LEANCLOUD_APP_PORT")) leancloud.init( APP_ID, app_key=APP_KEY, master_key=MASTER_KEY, hook_key=HOOK_KEY ) def application(environ, start_response): start_response( "200 OK".encode("utf-8"), [("Content-Type".encode("utf-8"), "text/plain".encode("utf-8"))], ) return "This is a LeanEngine application." class NopLogger(object): def write(self, s): pass app = self.wrap(application) self.server = WSGIServer(("", PORT), app, log=NopLogger()) print("LeanEngine Cloud Functions app is running, port:", PORT) self.server.serve_forever()
[文档] def stop(self): self.server.stop()
__all__ = ["user", "Engine", "LeanEngineError"]