# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
import six
import leancloud
from leancloud import client
from leancloud import utils
from leancloud.file_ import File
from leancloud.object_ import Object
from leancloud.errors import LeanCloudError
__author__ = "asaka <lan@leancloud.rocks>"
class CQLResult(object):
"""
CQL 查询结果对象。
Attributes:
results: 返回的查询结果
count: 如果查询语句包含 count,将保存在此字段
class_name: 查询的 class 名称
"""
__slots__ = ["results", "count", "class_name"]
def __init__(self, results, count, class_name):
self.results = results
self.count = count
self.class_name = class_name
class Cursor(object):
"""
Query.scan 返回结果对象。
"""
def __init__(self, query_class, batch_size, scan_key, params):
self._params = params
self._query_class = query_class
if batch_size is not None:
self._params["limit"] = batch_size
if scan_key is not None:
self._params["scan_key"] = scan_key
def __iter__(self):
while True:
content = client.get(
"/scan/classes/{}".format(self._query_class._class_name), self._params
).json()
for result in content["results"]:
obj = self._query_class()
obj._update_data(result)
yield obj
if not content.get("cursor"):
break
self._params["cursor"] = content["cursor"]
[文档]class Query(object):
def __init__(self, query_class):
"""
:param query_class: 要查询的 class 名称或者对象
:type query_class: string_types or leancloud.ObjectMeta
"""
if isinstance(query_class, six.string_types):
if query_class in ("File", "_File"):
query_class = File
else:
query_class = Object.extend(query_class)
if not isinstance(query_class, (type, six.class_types)) or not issubclass(
query_class, (File, Object)
):
raise ValueError("Query takes string or LeanCloud Object")
self._query_class = query_class
self._where = {}
self._include = []
self._include_acl = None
self._limit = -1
self._skip = 0
self._extra = {}
self._order = []
self._select = []
[文档] @classmethod
def or_(cls, *queries):
"""
根据传入的 Query 对象,构造一个新的 OR 查询。
:param queries: 需要构造的子查询列表
:rtype: Query
"""
if len(queries) < 2:
raise ValueError("or_ need two queries at least")
if not all(
x._query_class._class_name == queries[0]._query_class._class_name
for x in queries
):
raise TypeError("All queries must be for the same class")
query = Query(queries[0]._query_class._class_name)
query._or_query(queries)
return query
[文档] @classmethod
def and_(cls, *queries):
"""
根据传入的 Query 对象,构造一个新的 AND 查询。
:param queries: 需要构造的子查询列表
:rtype: Query
"""
if len(queries) < 2:
raise ValueError("and_ need two queries at least")
if not all(
x._query_class._class_name == queries[0]._query_class._class_name
for x in queries
):
raise TypeError("All queries must be for the same class")
query = Query(queries[0]._query_class._class_name)
query._and_query(queries)
return query
[文档] @classmethod
def do_cloud_query(cls, cql, *pvalues):
"""
使用 CQL 来构造查询。CQL 语法参考 `这里 <https://cn.avoscloud.com/docs/cql_guide.html>`_。
:param cql: CQL 语句
:param pvalues: 查询参数
:rtype: CQLResult
"""
params = {"cql": cql}
if len(pvalues) == 1 and isinstance(pvalues[0], (tuple, list)):
pvalues = json.dumps(pvalues[0])
if len(pvalues) > 0:
params["pvalues"] = json.dumps(pvalues)
content = client.get("/cloudQuery", params).json()
objs = []
query = cls(content["className"])
for result in content["results"]:
obj = query._new_object()
obj._update_data(query._process_result(result))
objs.append(obj)
return CQLResult(objs, content.get("count"), content.get("className"))
[文档] def dump(self):
"""
:return: 当前对象的序列化结果
:rtype: dict
"""
params = {
"where": self._where,
}
if self._include:
params["include"] = ",".join(self._include)
if self._select:
params["keys"] = ",".join(self._select)
if self._include_acl is not None:
params["returnACL"] = json.dumps(self._include_acl)
if self._limit >= 0:
params["limit"] = self._limit
if self._skip > 0:
params["skip"] = self._skip
if self._order:
params["order"] = ",".join(self._order)
params.update(self._extra)
return params
def _new_object(self):
return self._query_class()
def _process_result(self, obj):
return obj
def _do_request(self, params):
return client.get(
"/classes/{0}".format(self._query_class._class_name), params
).json()
[文档] def first(self):
"""
根据查询获取最多一个对象。
:return: 查询结果
:rtype: Object
:raise: LeanCloudError
"""
params = self.dump()
params["limit"] = 1
content = self._do_request(params)
results = content["results"]
if not results:
raise LeanCloudError(101, "Object not found")
obj = self._new_object()
obj._update_data(self._process_result(results[0]))
return obj
[文档] def get(self, object_id):
"""
根据 objectId 查询。
:param object_id: 要查询对象的 objectId
:return: 查询结果
:rtype: Object
"""
if not object_id:
raise LeanCloudError(code=101, error="Object not found.")
obj = self._query_class.create_without_data(object_id)
obj.fetch(select=self._select, include=self._include)
return obj
[文档] def find(self):
"""
根据查询条件,获取包含所有满足条件的对象。
:rtype: list
"""
content = self._do_request(self.dump())
objs = []
for result in content["results"]:
obj = self._new_object()
obj._update_data(self._process_result(result))
objs.append(obj)
return objs
[文档] def scan(self, batch_size=None, scan_key=None):
params = self.dump()
if "skip" in params:
raise LeanCloudError(1, "Query.scan dose not support skip option")
if "limit" in params:
raise LeanCloudError(1, "Query.scan dose not support limit option")
return Cursor(self._query_class, batch_size, scan_key, params)
[文档] def count(self):
"""
返回满足查询条件的对象的数量。
:rtype: int
"""
params = self.dump()
params["limit"] = 0
params["count"] = 1
content = self._do_request(params)
return content["count"]
[文档] def skip(self, n):
"""
查询条件中跳过指定个数的对象,在做分页时很有帮助。
:param n: 需要跳过对象的个数
:rtype: Query
"""
self._skip = n
return self
[文档] def limit(self, n):
"""
设置查询返回结果的数量。如果不设置,默认为 100。最大返回数量为 1000,如果超过这个数量,需要使用多次查询来获取结果。
:param n: 限制结果的数量
:rtype: Query
"""
if n > 1000:
raise ValueError("limit only accept number less than or equal to 1000")
self._limit = n
return self
[文档] def include_acl(self, value=True):
"""
设置查询结果的对象,是否包含 ACL 字段。需要在控制台选项中开启对应选项才能生效。
:param value: 是否包含 ACL,默认为 True
:type value: bool
:rtype: Query
"""
self._include_acl = value
return self
[文档] def equal_to(self, key, value):
"""
增加查询条件,查询字段的值必须为指定值。
:param key: 查询条件的字段名
:param value: 查询条件的值
:rtype: Query
"""
self._where[key] = utils.encode(value)
return self
[文档] def size_equal_to(self, key, size):
"""
增加查询条件,限制查询结果指定数组字段长度与查询值相同
:param key: 查询条件数组字段名
:param size: 查询条件值
:rtype: Query
"""
self._add_condition(key, "$size", size)
return self
def _add_condition(self, key, condition, value):
if not self._where.get(key):
self._where[key] = {}
self._where[key][condition] = utils.encode(value)
return self
[文档] def not_equal_to(self, key, value):
"""
增加查询条件,限制查询结果指定字段的值与查询值不同
:param key: 查询条件字段名
:param value: 查询条件值
:rtype: Query
"""
self._add_condition(key, "$ne", value)
return self
[文档] def less_than(self, key, value):
"""
增加查询条件,限制查询结果指定字段的值小于查询值
:param key: 查询条件字段名
:param value: 查询条件值
:rtype: Query
"""
self._add_condition(key, "$lt", value)
return self
[文档] def greater_than(self, key, value):
"""
增加查询条件,限制查询结果指定字段的值大于查询值
:param key: 查询条件字段名
:param value: 查询条件值
:rtype: Query
"""
self._add_condition(key, "$gt", value)
return self
[文档] def less_than_or_equal_to(self, key, value):
"""
增加查询条件,限制查询结果指定字段的值小于等于查询值
:param key: 查询条件字段名
:param value: 查询条件值
:rtype: Query
"""
self._add_condition(key, "$lte", value)
return self
[文档] def greater_than_or_equal_to(self, key, value):
"""
增加查询条件,限制查询结果指定字段的值大于等于查询值
:param key: 查询条件字段名
:param value: 查询条件值名
:rtype: Query
"""
self._add_condition(key, "$gte", value)
return self
[文档] def contained_in(self, key, values):
"""
增加查询条件,限制查询结果指定字段的值在查询值列表中
:param key: 查询条件字段名
:param values: 查询条件值
:type values: list or tuple
:rtype: Query
"""
self._add_condition(key, "$in", values)
return self
[文档] def not_contained_in(self, key, values):
"""
增加查询条件,限制查询结果指定字段的值不在查询值列表中
:param key: 查询条件字段名
:param values: 查询条件值
:type values: list or tuple
:rtype: Query
"""
self._add_condition(key, "$nin", values)
return self
[文档] def contains_all(self, key, values):
"""
增加查询条件,限制查询结果指定字段的值全部包含与查询值列表中
:param key: 查询条件字段名
:param values: 查询条件值
:type values: list or tuple
:rtype: Query
"""
self._add_condition(key, "$all", values)
return self
[文档] def exists(self, key):
"""
增加查询条件,限制查询结果对象包含指定字段
:param key: 查询条件字段名
:rtype: Query
"""
self._add_condition(key, "$exists", True)
return self
[文档] def does_not_exist(self, key):
"""
增加查询条件,限制查询结果对象不包含指定字段
:param key: 查询条件字段名
:rtype: Query
"""
self._add_condition(key, "$exists", False)
return self
[文档] def matched(self, key, regex, ignore_case=False, multi_line=False):
"""
增加查询条件,限制查询结果对象指定字段满足指定的正则表达式。
:param key: 查询条件字段名
:param regex: 查询正则表达式
:param ignore_case: 查询是否忽略大小写,默认不忽略
:param multi_line: 查询是否匹配多行,默认不匹配
:rtype: Query
"""
if not isinstance(regex, six.string_types):
raise TypeError("matched only accept str or unicode")
self._add_condition(key, "$regex", regex)
modifiers = ""
if ignore_case:
modifiers += "i"
if multi_line:
modifiers += "m"
if modifiers:
self._add_condition(key, "$options", modifiers)
return self
[文档] def matches_query(self, key, query):
"""
增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果相同。
:param key: 查询条件字段名
:param query: 查询对象
:type query: Query
:rtype: Query
"""
dumped = query.dump()
dumped["className"] = query._query_class._class_name
self._add_condition(key, "$inQuery", dumped)
return self
[文档] def does_not_match_query(self, key, query):
"""
增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果不相同。
:param key: 查询条件字段名
:param query: 查询对象
:type query: Query
:rtype: Query
"""
dumped = query.dump()
dumped["className"] = query._query_class._class_name
self._add_condition(key, "$notInQuery", dumped)
return self
[文档] def matches_key_in_query(self, key, query_key, query):
"""
增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果指定的值相同。
:param key: 查询条件字段名
:param query_key: 查询对象返回结果的字段名
:param query: 查询对象
:type query: Query
:rtype: Query
"""
dumped = query.dump()
dumped["className"] = query._query_class._class_name
self._add_condition(key, "$select", {"key": query_key, "query": dumped})
return self
[文档] def does_not_match_key_in_query(self, key, query_key, query):
"""
增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果指定的值不相同。
:param key: 查询条件字段名
:param query_key: 查询对象返回结果的字段名
:param query: 查询对象
:type query: Query
:rtype: Query
"""
dumped = query.dump()
dumped["className"] = query._query_class._class_name
self._add_condition(key, "$dontSelect", {"key": query_key, "query": dumped})
return self
def _or_query(self, queries):
dumped = [q.dump()["where"] for q in queries]
self._where["$or"] = dumped
return self
def _and_query(self, queries):
dumped = [q.dump()["where"] for q in queries]
self._where["$and"] = dumped
def _quote(self, s):
# return "\\Q" + s.replace("\\E", "\\E\\\\E\\Q") + "\\E"
return s
[文档] def contains(self, key, value):
"""
增加查询条件,限制查询结果对象指定最短的值,包含指定字符串。在数据量比较大的情况下会比较慢。
:param key: 查询条件字段名
:param value: 需要包含的字符串
:rtype: Query
"""
self._add_condition(key, "$regex", self._quote(value))
return self
[文档] def startswith(self, key, value):
"""
增加查询条件,限制查询结果对象指定最短的值,以指定字符串开头。在数据量比较大的情况下会比较慢。
:param key: 查询条件字段名
:param value: 需要查询的字符串
:rtype: Query
"""
value = value if isinstance(value, six.text_type) else value.decode("utf-8")
self._add_condition(key, "$regex", "^" + self._quote(value))
return self
[文档] def endswith(self, key, value):
"""
增加查询条件,限制查询结果对象指定最短的值,以指定字符串结尾。在数据量比较大的情况下会比较慢。
:param key: 查询条件字段名
:param value: 需要查询的字符串
:rtype: Query
"""
value = value if isinstance(value, six.text_type) else value.decode("utf-8")
self._add_condition(key, "$regex", self._quote(value) + "$")
return self
[文档] def ascending(self, key):
"""
限制查询返回结果以指定字段升序排序。
:param key: 排序字段名
:rtype: Query
"""
self._order = [key]
return self
[文档] def add_ascending(self, key):
"""
增加查询排序条件。之前指定的排序条件优先级更高。
:param key: 排序字段名
:rtype: Query
"""
self._order.append(key)
return self
[文档] def descending(self, key):
"""
限制查询返回结果以指定字段降序排序。
:param key: 排序字段名
:rtype: Query
"""
self._order = ["-{0}".format(key)]
return self
[文档] def add_descending(self, key):
"""
增加查询排序条件。之前指定的排序条件优先级更高。
:param key: 排序字段名
:rtype: Query
"""
self._order.append("-{0}".format(key))
return self
[文档] def near(self, key, point):
"""
增加查询条件,限制返回结果指定字段值的位置与给定地理位置临近。
:param key: 查询条件字段名
:param point: 需要查询的地理位置
:rtype: Query
"""
if point is None:
raise ValueError("near query does not accept None")
self._add_condition(key, "$nearSphere", point)
return self
[文档] def within_radians(self, key, point, max_distance, min_distance=None):
"""
增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。
:param key: 查询条件字段名
:param point: 查询地理位置
:param max_distance: 最大距离限定(弧度)
:param min_distance: 最小距离限定(弧度)
:rtype: Query
"""
self.near(key, point)
self._add_condition(key, "$maxDistance", max_distance)
if min_distance is not None:
self._add_condition(key, "$minDistance", min_distance)
return self
[文档] def within_miles(self, key, point, max_distance, min_distance=None):
"""
增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。
:param key: 查询条件字段名
:param point: 查询地理位置
:param max_distance: 最大距离限定(英里)
:param min_distance: 最小距离限定(英里)
:rtype: Query
"""
if min_distance is not None:
min_distance = min_distance / 3958.8
return self.within_radians(key, point, max_distance / 3958.8, min_distance)
[文档] def within_kilometers(self, key, point, max_distance, min_distance=None):
"""
增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。
:param key: 查询条件字段名
:param point: 查询地理位置
:param max_distance: 最大距离限定(千米)
:param min_distance: 最小距离限定(千米)
:rtype: Query
"""
if min_distance is not None:
min_distance = min_distance / 6371.0
return self.within_radians(key, point, max_distance / 6371.0, min_distance)
[文档] def within_geo_box(self, key, southwest, northeast):
"""
增加查询条件,限制返回结果指定字段值的位置在指定坐标范围之内。
:param key: 查询条件字段名
:param southwest: 限制范围西南角坐标
:param northeast: 限制范围东北角坐标
:rtype: Query
"""
self._add_condition(key, "$within", {"$box": [southwest, northeast]})
return self
[文档] def include(self, *keys):
"""
指定查询返回结果中包含关联表字段。
:param keys: 关联子表字段名
:rtype: Query
"""
if len(keys) == 1 and isinstance(keys[0], (list, tuple)):
keys = keys[0]
self._include += keys
return self
[文档] def select(self, *keys):
"""
指定查询返回结果中只包含某些字段。可以重复调用,每次调用的包含内容都将会被返回。
:param keys: 包含字段名
:rtype: Query
"""
if len(keys) == 1 and isinstance(keys[0], (list, tuple)):
keys = keys[0]
self._select += keys
return self
[文档]class FriendshipQuery(Query):
def __init__(self, query_class):
super(FriendshipQuery, self).__init__(query_class)
if query_class in ("_Follower", "Follower"):
self._friendship_tag = "follower"
elif query_class in ("_Followee", "Followee"):
self._friendship_tag = "followee"
else:
raise TypeError("FriendshipQuery takes only follower or followee")
def _new_object(self):
return leancloud.User()
def _process_result(self, obj):
content = obj[self._friendship_tag]
if content["__type"] == "Pointer" and content["className"] == "_User":
del content["__type"]
del content["className"]
return content