leancloud.query 源代码

# coding: utf-8

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import json

import six

import leancloud
from leancloud import client
from leancloud import utils
from leancloud.file_ import File
from leancloud.object_ import Object
from leancloud.errors import LeanCloudError

__author__ = "asaka <lan@leancloud.rocks>"


class CQLResult(object):
    """
    CQL 查询结果对象。

    Attributes:
        results: 返回的查询结果

        count: 如果查询语句包含 count,将保存在此字段

        class_name: 查询的 class 名称
    """

    __slots__ = ["results", "count", "class_name"]

    def __init__(self, results, count, class_name):
        self.results = results
        self.count = count
        self.class_name = class_name


class Cursor(object):
    """
    Query.scan 返回结果对象。
    """

    def __init__(self, query_class, batch_size, scan_key, params):
        self._params = params
        self._query_class = query_class

        if batch_size is not None:
            self._params["limit"] = batch_size

        if scan_key is not None:
            self._params["scan_key"] = scan_key

    def __iter__(self):
        while True:
            content = client.get(
                "/scan/classes/{}".format(self._query_class._class_name), self._params
            ).json()
            for result in content["results"]:
                obj = self._query_class()
                obj._update_data(result)
                yield obj

            if not content.get("cursor"):
                break

            self._params["cursor"] = content["cursor"]


[文档]class Query(object): def __init__(self, query_class): """ :param query_class: 要查询的 class 名称或者对象 :type query_class: string_types or leancloud.ObjectMeta """ if isinstance(query_class, six.string_types): if query_class in ("File", "_File"): query_class = File else: query_class = Object.extend(query_class) if not isinstance(query_class, (type, six.class_types)) or not issubclass( query_class, (File, Object) ): raise ValueError("Query takes string or LeanCloud Object") self._query_class = query_class self._where = {} self._include = [] self._include_acl = None self._limit = -1 self._skip = 0 self._extra = {} self._order = [] self._select = []
[文档] @classmethod def or_(cls, *queries): """ 根据传入的 Query 对象,构造一个新的 OR 查询。 :param queries: 需要构造的子查询列表 :rtype: Query """ if len(queries) < 2: raise ValueError("or_ need two queries at least") if not all( x._query_class._class_name == queries[0]._query_class._class_name for x in queries ): raise TypeError("All queries must be for the same class") query = Query(queries[0]._query_class._class_name) query._or_query(queries) return query
[文档] @classmethod def and_(cls, *queries): """ 根据传入的 Query 对象,构造一个新的 AND 查询。 :param queries: 需要构造的子查询列表 :rtype: Query """ if len(queries) < 2: raise ValueError("and_ need two queries at least") if not all( x._query_class._class_name == queries[0]._query_class._class_name for x in queries ): raise TypeError("All queries must be for the same class") query = Query(queries[0]._query_class._class_name) query._and_query(queries) return query
[文档] @classmethod def do_cloud_query(cls, cql, *pvalues): """ 使用 CQL 来构造查询。CQL 语法参考 `这里 <https://cn.avoscloud.com/docs/cql_guide.html>`_。 :param cql: CQL 语句 :param pvalues: 查询参数 :rtype: CQLResult """ params = {"cql": cql} if len(pvalues) == 1 and isinstance(pvalues[0], (tuple, list)): pvalues = json.dumps(pvalues[0]) if len(pvalues) > 0: params["pvalues"] = json.dumps(pvalues) content = client.get("/cloudQuery", params).json() objs = [] query = cls(content["className"]) for result in content["results"]: obj = query._new_object() obj._update_data(query._process_result(result)) objs.append(obj) return CQLResult(objs, content.get("count"), content.get("className"))
[文档] def dump(self): """ :return: 当前对象的序列化结果 :rtype: dict """ params = { "where": self._where, } if self._include: params["include"] = ",".join(self._include) if self._select: params["keys"] = ",".join(self._select) if self._include_acl is not None: params["returnACL"] = json.dumps(self._include_acl) if self._limit >= 0: params["limit"] = self._limit if self._skip > 0: params["skip"] = self._skip if self._order: params["order"] = ",".join(self._order) params.update(self._extra) return params
def _new_object(self): return self._query_class() def _process_result(self, obj): return obj def _do_request(self, params): return client.get( "/classes/{0}".format(self._query_class._class_name), params ).json()
[文档] def first(self): """ 根据查询获取最多一个对象。 :return: 查询结果 :rtype: Object :raise: LeanCloudError """ params = self.dump() params["limit"] = 1 content = self._do_request(params) results = content["results"] if not results: raise LeanCloudError(101, "Object not found") obj = self._new_object() obj._update_data(self._process_result(results[0])) return obj
[文档] def get(self, object_id): """ 根据 objectId 查询。 :param object_id: 要查询对象的 objectId :return: 查询结果 :rtype: Object """ if not object_id: raise LeanCloudError(code=101, error="Object not found.") obj = self._query_class.create_without_data(object_id) obj.fetch(select=self._select, include=self._include) return obj
[文档] def find(self): """ 根据查询条件,获取包含所有满足条件的对象。 :rtype: list """ content = self._do_request(self.dump()) objs = [] for result in content["results"]: obj = self._new_object() obj._update_data(self._process_result(result)) objs.append(obj) return objs
[文档] def scan(self, batch_size=None, scan_key=None): params = self.dump() if "skip" in params: raise LeanCloudError(1, "Query.scan dose not support skip option") if "limit" in params: raise LeanCloudError(1, "Query.scan dose not support limit option") return Cursor(self._query_class, batch_size, scan_key, params)
[文档] def count(self): """ 返回满足查询条件的对象的数量。 :rtype: int """ params = self.dump() params["limit"] = 0 params["count"] = 1 content = self._do_request(params) return content["count"]
[文档] def skip(self, n): """ 查询条件中跳过指定个数的对象,在做分页时很有帮助。 :param n: 需要跳过对象的个数 :rtype: Query """ self._skip = n return self
[文档] def limit(self, n): """ 设置查询返回结果的数量。如果不设置,默认为 100。最大返回数量为 1000,如果超过这个数量,需要使用多次查询来获取结果。 :param n: 限制结果的数量 :rtype: Query """ if n > 1000: raise ValueError("limit only accept number less than or equal to 1000") self._limit = n return self
[文档] def include_acl(self, value=True): """ 设置查询结果的对象,是否包含 ACL 字段。需要在控制台选项中开启对应选项才能生效。 :param value: 是否包含 ACL,默认为 True :type value: bool :rtype: Query """ self._include_acl = value return self
[文档] def equal_to(self, key, value): """ 增加查询条件,查询字段的值必须为指定值。 :param key: 查询条件的字段名 :param value: 查询条件的值 :rtype: Query """ self._where[key] = utils.encode(value) return self
[文档] def size_equal_to(self, key, size): """ 增加查询条件,限制查询结果指定数组字段长度与查询值相同 :param key: 查询条件数组字段名 :param size: 查询条件值 :rtype: Query """ self._add_condition(key, "$size", size) return self
def _add_condition(self, key, condition, value): if not self._where.get(key): self._where[key] = {} self._where[key][condition] = utils.encode(value) return self
[文档] def not_equal_to(self, key, value): """ 增加查询条件,限制查询结果指定字段的值与查询值不同 :param key: 查询条件字段名 :param value: 查询条件值 :rtype: Query """ self._add_condition(key, "$ne", value) return self
[文档] def less_than(self, key, value): """ 增加查询条件,限制查询结果指定字段的值小于查询值 :param key: 查询条件字段名 :param value: 查询条件值 :rtype: Query """ self._add_condition(key, "$lt", value) return self
[文档] def greater_than(self, key, value): """ 增加查询条件,限制查询结果指定字段的值大于查询值 :param key: 查询条件字段名 :param value: 查询条件值 :rtype: Query """ self._add_condition(key, "$gt", value) return self
[文档] def less_than_or_equal_to(self, key, value): """ 增加查询条件,限制查询结果指定字段的值小于等于查询值 :param key: 查询条件字段名 :param value: 查询条件值 :rtype: Query """ self._add_condition(key, "$lte", value) return self
[文档] def greater_than_or_equal_to(self, key, value): """ 增加查询条件,限制查询结果指定字段的值大于等于查询值 :param key: 查询条件字段名 :param value: 查询条件值名 :rtype: Query """ self._add_condition(key, "$gte", value) return self
[文档] def contained_in(self, key, values): """ 增加查询条件,限制查询结果指定字段的值在查询值列表中 :param key: 查询条件字段名 :param values: 查询条件值 :type values: list or tuple :rtype: Query """ self._add_condition(key, "$in", values) return self
[文档] def not_contained_in(self, key, values): """ 增加查询条件,限制查询结果指定字段的值不在查询值列表中 :param key: 查询条件字段名 :param values: 查询条件值 :type values: list or tuple :rtype: Query """ self._add_condition(key, "$nin", values) return self
[文档] def contains_all(self, key, values): """ 增加查询条件,限制查询结果指定字段的值全部包含与查询值列表中 :param key: 查询条件字段名 :param values: 查询条件值 :type values: list or tuple :rtype: Query """ self._add_condition(key, "$all", values) return self
[文档] def exists(self, key): """ 增加查询条件,限制查询结果对象包含指定字段 :param key: 查询条件字段名 :rtype: Query """ self._add_condition(key, "$exists", True) return self
[文档] def does_not_exist(self, key): """ 增加查询条件,限制查询结果对象不包含指定字段 :param key: 查询条件字段名 :rtype: Query """ self._add_condition(key, "$exists", False) return self
[文档] def matched(self, key, regex, ignore_case=False, multi_line=False): """ 增加查询条件,限制查询结果对象指定字段满足指定的正则表达式。 :param key: 查询条件字段名 :param regex: 查询正则表达式 :param ignore_case: 查询是否忽略大小写,默认不忽略 :param multi_line: 查询是否匹配多行,默认不匹配 :rtype: Query """ if not isinstance(regex, six.string_types): raise TypeError("matched only accept str or unicode") self._add_condition(key, "$regex", regex) modifiers = "" if ignore_case: modifiers += "i" if multi_line: modifiers += "m" if modifiers: self._add_condition(key, "$options", modifiers) return self
[文档] def matches_query(self, key, query): """ 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果相同。 :param key: 查询条件字段名 :param query: 查询对象 :type query: Query :rtype: Query """ dumped = query.dump() dumped["className"] = query._query_class._class_name self._add_condition(key, "$inQuery", dumped) return self
[文档] def does_not_match_query(self, key, query): """ 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果不相同。 :param key: 查询条件字段名 :param query: 查询对象 :type query: Query :rtype: Query """ dumped = query.dump() dumped["className"] = query._query_class._class_name self._add_condition(key, "$notInQuery", dumped) return self
[文档] def matches_key_in_query(self, key, query_key, query): """ 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果指定的值相同。 :param key: 查询条件字段名 :param query_key: 查询对象返回结果的字段名 :param query: 查询对象 :type query: Query :rtype: Query """ dumped = query.dump() dumped["className"] = query._query_class._class_name self._add_condition(key, "$select", {"key": query_key, "query": dumped}) return self
[文档] def does_not_match_key_in_query(self, key, query_key, query): """ 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果指定的值不相同。 :param key: 查询条件字段名 :param query_key: 查询对象返回结果的字段名 :param query: 查询对象 :type query: Query :rtype: Query """ dumped = query.dump() dumped["className"] = query._query_class._class_name self._add_condition(key, "$dontSelect", {"key": query_key, "query": dumped}) return self
def _or_query(self, queries): dumped = [q.dump()["where"] for q in queries] self._where["$or"] = dumped return self def _and_query(self, queries): dumped = [q.dump()["where"] for q in queries] self._where["$and"] = dumped def _quote(self, s): # return "\\Q" + s.replace("\\E", "\\E\\\\E\\Q") + "\\E" return s
[文档] def contains(self, key, value): """ 增加查询条件,限制查询结果对象指定最短的值,包含指定字符串。在数据量比较大的情况下会比较慢。 :param key: 查询条件字段名 :param value: 需要包含的字符串 :rtype: Query """ self._add_condition(key, "$regex", self._quote(value)) return self
[文档] def startswith(self, key, value): """ 增加查询条件,限制查询结果对象指定最短的值,以指定字符串开头。在数据量比较大的情况下会比较慢。 :param key: 查询条件字段名 :param value: 需要查询的字符串 :rtype: Query """ value = value if isinstance(value, six.text_type) else value.decode("utf-8") self._add_condition(key, "$regex", "^" + self._quote(value)) return self
[文档] def endswith(self, key, value): """ 增加查询条件,限制查询结果对象指定最短的值,以指定字符串结尾。在数据量比较大的情况下会比较慢。 :param key: 查询条件字段名 :param value: 需要查询的字符串 :rtype: Query """ value = value if isinstance(value, six.text_type) else value.decode("utf-8") self._add_condition(key, "$regex", self._quote(value) + "$") return self
[文档] def ascending(self, key): """ 限制查询返回结果以指定字段升序排序。 :param key: 排序字段名 :rtype: Query """ self._order = [key] return self
[文档] def add_ascending(self, key): """ 增加查询排序条件。之前指定的排序条件优先级更高。 :param key: 排序字段名 :rtype: Query """ self._order.append(key) return self
[文档] def descending(self, key): """ 限制查询返回结果以指定字段降序排序。 :param key: 排序字段名 :rtype: Query """ self._order = ["-{0}".format(key)] return self
[文档] def add_descending(self, key): """ 增加查询排序条件。之前指定的排序条件优先级更高。 :param key: 排序字段名 :rtype: Query """ self._order.append("-{0}".format(key)) return self
[文档] def near(self, key, point): """ 增加查询条件,限制返回结果指定字段值的位置与给定地理位置临近。 :param key: 查询条件字段名 :param point: 需要查询的地理位置 :rtype: Query """ if point is None: raise ValueError("near query does not accept None") self._add_condition(key, "$nearSphere", point) return self
[文档] def within_radians(self, key, point, max_distance, min_distance=None): """ 增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。 :param key: 查询条件字段名 :param point: 查询地理位置 :param max_distance: 最大距离限定(弧度) :param min_distance: 最小距离限定(弧度) :rtype: Query """ self.near(key, point) self._add_condition(key, "$maxDistance", max_distance) if min_distance is not None: self._add_condition(key, "$minDistance", min_distance) return self
[文档] def within_miles(self, key, point, max_distance, min_distance=None): """ 增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。 :param key: 查询条件字段名 :param point: 查询地理位置 :param max_distance: 最大距离限定(英里) :param min_distance: 最小距离限定(英里) :rtype: Query """ if min_distance is not None: min_distance = min_distance / 3958.8 return self.within_radians(key, point, max_distance / 3958.8, min_distance)
[文档] def within_kilometers(self, key, point, max_distance, min_distance=None): """ 增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。 :param key: 查询条件字段名 :param point: 查询地理位置 :param max_distance: 最大距离限定(千米) :param min_distance: 最小距离限定(千米) :rtype: Query """ if min_distance is not None: min_distance = min_distance / 6371.0 return self.within_radians(key, point, max_distance / 6371.0, min_distance)
[文档] def within_geo_box(self, key, southwest, northeast): """ 增加查询条件,限制返回结果指定字段值的位置在指定坐标范围之内。 :param key: 查询条件字段名 :param southwest: 限制范围西南角坐标 :param northeast: 限制范围东北角坐标 :rtype: Query """ self._add_condition(key, "$within", {"$box": [southwest, northeast]}) return self
[文档] def include(self, *keys): """ 指定查询返回结果中包含关联表字段。 :param keys: 关联子表字段名 :rtype: Query """ if len(keys) == 1 and isinstance(keys[0], (list, tuple)): keys = keys[0] self._include += keys return self
[文档] def select(self, *keys): """ 指定查询返回结果中只包含某些字段。可以重复调用,每次调用的包含内容都将会被返回。 :param keys: 包含字段名 :rtype: Query """ if len(keys) == 1 and isinstance(keys[0], (list, tuple)): keys = keys[0] self._select += keys return self
[文档]class FriendshipQuery(Query): def __init__(self, query_class): super(FriendshipQuery, self).__init__(query_class) if query_class in ("_Follower", "Follower"): self._friendship_tag = "follower" elif query_class in ("_Followee", "Followee"): self._friendship_tag = "followee" else: raise TypeError("FriendshipQuery takes only follower or followee") def _new_object(self): return leancloud.User() def _process_result(self, obj): content = obj[self._friendship_tag] if content["__type"] == "Pointer" and content["className"] == "_User": del content["__type"] del content["className"] return content