import sys
import functools
import json
import shutil
import urllib
import os
from .encoder import CustomJSONEncoder
from .__version__ import __version__
from .exception import (
TooBigFileException,
NotAuthenticatedException,
NotAllowedException,
MethodNotAllowedException,
ParameterException,
RouteNotFoundException,
ServerErrorException,
UploadFailedException,
)
if sys.version_info[0] == 3:
from json import JSONDecodeError
else:
JSONDecodeError = ValueError
DEBUG = os.getenv("GAZU_DEBUG", "false").lower() == "true"
class KitsuClient(object):
def __init__(
self,
host,
ssl_verify=True,
cert=None,
automatic_refresh_token=False,
callback_not_authenticated=None,
):
self.tokens = {"access_token": "", "refresh_token": ""}
self.session = requests.Session()
self.session.verify = ssl_verify
self.session.cert = cert
self.host = host
self.event_host = host
self.automatic_refresh_token = automatic_refresh_token
self.callback_not_authenticated = callback_not_authenticated
def create_client(
host,
ssl_verify=True,
cert=None,
automatic_refresh_token=False,
callback_not_authenticated=None,
):
return KitsuClient(
host,
ssl_verify,
cert=cert,
automatic_refresh_token=automatic_refresh_token,
callback_not_authenticated=callback_not_authenticated,
)
default_client = None
try:
import requests
# Little hack to allow json encoder to manage dates.
requests.models.complexjson.dumps = functools.partial(
json.dumps, cls=CustomJSONEncoder
)
host = "http://gazu.change.serverhost/api"
default_client = create_client(host)
except Exception:
print("Warning, running in setup mode!")
[docs]
def host_is_up(client=default_client):
"""
Returns:
True if the host is up.
"""
try:
response = client.session.head(client.host)
except Exception:
return False
return response.status_code == 200
[docs]
def host_is_valid(client=default_client):
"""
Check if the host is valid by simulating a fake login.
Returns:
True if the host is valid.
"""
if not host_is_up(client):
return False
try:
post("auth/login", {"email": ""})
except Exception as exc:
return isinstance(exc, (NotAuthenticatedException, ParameterException))
[docs]
def get_host(client=default_client):
"""
Returns:
Host on which requests are sent.
"""
return client.host
[docs]
def get_api_url_from_host(client=default_client):
"""
Returns:
Zou url, retrieved from host.
"""
return client.host[:-4]
[docs]
def set_host(new_host, client=default_client):
"""
Returns:
Set currently configured host on which requests are sent.
"""
client.host = new_host
return client.host
[docs]
def get_event_host(client=default_client):
"""
Returns:
Host on which listening for events.
"""
return client.event_host or client.host
[docs]
def set_event_host(new_host, client=default_client):
"""
Returns:
Set currently configured host on which listening for events.
"""
client.event_host = new_host
return client.event_host
[docs]
def set_tokens(new_tokens, client=default_client):
"""
Store authentication token to reuse them for all requests.
Args:
new_tokens (dict): Tokens to use for authentication.
"""
client.tokens = new_tokens
return client.tokens
[docs]
def url_path_join(*items):
"""
Make it easier to build url path by joining every arguments with a '/'
character.
Args:
items (list): Path elements
"""
return "/".join([item.lstrip("/").rstrip("/") for item in items])
[docs]
def get_full_url(path, client=default_client):
"""
Args:
path (str): The path to integrate to host url.
Returns:
The result of joining configured host url with given path.
"""
return url_path_join(get_host(client), path)
[docs]
def build_path_with_params(path, params):
"""
Add params to a path using urllib encoding
Args:
path (str): The url base path
params (dict): The parameters to add as a dict
Returns:
str: the builded path
"""
if not params:
return path
if hasattr(urllib, "urlencode"):
path = "%s?%s" % (path, urllib.urlencode(params))
else:
path = "%s?%s" % (path, urllib.parse.urlencode(params))
return path
[docs]
def get(path, json_response=True, params=None, client=default_client):
"""
Run a get request toward given path for configured host.
Returns:
The request result.
"""
if DEBUG:
print("GET", get_full_url(path, client))
path = build_path_with_params(path, params)
retry = True
while retry:
response = client.session.get(
get_full_url(path, client=client),
headers=make_auth_header(client=client),
)
_, retry = check_status(response, path, client=client)
if json_response:
return response.json()
else:
return response.text
[docs]
def post(path, data, client=default_client):
"""
Run a post request toward given path for configured host.
Returns:
The request result.
"""
if DEBUG:
print("POST", get_full_url(path, client))
if not "password" in data:
print("Body:", data)
retry = True
while retry:
response = client.session.post(
get_full_url(path, client),
json=data,
headers=make_auth_header(client=client),
)
_, retry = check_status(response, path, client=client)
try:
result = response.json()
except JSONDecodeError:
print(response.text)
raise
return result
[docs]
def put(path, data, client=default_client):
"""
Run a put request toward given path for configured host.
Returns:
The request result.
"""
if DEBUG:
print("PUT", get_full_url(path, client))
print("Body:", data)
retry = True
while retry:
response = client.session.put(
get_full_url(path, client),
json=data,
headers=make_auth_header(client=client),
)
_, retry = check_status(response, path, client=client)
return response.json()
[docs]
def delete(path, params=None, client=default_client):
"""
Run a delete request toward given path for configured host.
Returns:
The request result.
"""
if DEBUG:
print("DELETE", get_full_url(path, client))
path = build_path_with_params(path, params)
retry = True
while retry:
response = client.session.delete(
get_full_url(path, client), headers=make_auth_header(client=client)
)
_, retry = check_status(response, path, client=client)
return response.text
[docs]
def get_message_from_response(
response, default_message="No additional information"
):
"""
A utility function that handles Zou's inconsistent message keys.
For a given request, checks if any error messages or regular messages were given and returns their value.
If no messages are found, returns a default message.
Args:
response: requests.Request - A response to check.
default_message: str - An optional default value to revert to if no message is detected.
Returns:
The message of a given response, or a default message - if any.
"""
message = default_message
message_json = response.json()
for key in ["error", "message"]:
if message_json.get(key):
message = message_json[key]
break
return message
[docs]
def check_status(request, path, client=None):
"""
Raise an exception related to status code, if the status code does not
match a success code. Print error message when it's relevant.
Args:
request (Request): The request to validate.
Returns:
int: Status code
Raises:
ParameterException: when 400 response occurs
NotAuthenticatedException: when 401 response occurs
RouteNotFoundException: when 404 response occurs
NotAllowedException: when 403 response occurs
MethodNotAllowedException: when 405 response occurs
TooBigFileException: when 413 response occurs
ServerErrorException: when 500 response occurs
"""
status_code = request.status_code
if status_code == 404:
raise RouteNotFoundException(path)
elif status_code == 403:
raise NotAllowedException(path)
elif status_code == 400:
raise ParameterException(path, get_message_from_response(request))
elif status_code == 405:
raise MethodNotAllowedException(path)
elif status_code == 413:
raise TooBigFileException(
"%s: You send a too big file. "
"Change your proxy configuration to allow bigger files." % path
)
elif status_code in [401, 422]:
try:
if client is not None and client.automatic_refresh_token:
from . import refresh_token
refresh_token(client=client)
return status_code, True
else:
raise NotAuthenticatedException(path)
except NotAuthenticatedException:
if (
client is not None
and client.callback_not_authenticated is not None
):
retry = client.callback_not_authenticated(client, path)
if retry:
return status_code, True
raise
elif status_code in [500, 502]:
try:
print("A server error occured!\n")
stacktrace = request.json().get(
"stacktrace", "No stacktrace sent by the server"
)
print("Server stacktrace:\n%s" % stacktrace)
message = get_message_from_response(
response=request,
default_message="No message sent by the server",
)
print("Error message:\n%s\n" % message)
except Exception:
print(request.text)
raise ServerErrorException(path)
return status_code, False
[docs]
def fetch_all(path, params=None, client=default_client):
"""
Args:
path (str): The path for which we want to retrieve all entries.
Returns:
list: All entries stored in database for a given model. You can add a
filter to the model name like this: "tasks?project_id=project-id"
"""
return get(url_path_join("data", path), params=params, client=client)
[docs]
def fetch_first(path, params=None, client=default_client):
"""
Args:
path (str): The path for which we want to retrieve the first entry.
Returns:
dict: The first entry for which a model is required.
"""
entries = get(url_path_join("data", path), params=params, client=client)
if len(entries) > 0:
return entries[0]
else:
return None
[docs]
def fetch_one(model_name, id, client=default_client):
"""
Function dedicated at targeting routes that returns a single model
instance.
Args:
model_name (str): Model type name.
id (str): Model instance ID.
Returns:
dict: The model instance matching id and model name.
"""
return get(url_path_join("data", model_name, id), client=client)
[docs]
def create(model_name, data, client=default_client):
"""
Create an entry for given model and data.
Args:
model (str): The model type involved
data (str): The data to use for creation
Returns:
dict: Created entry
"""
return post(url_path_join("data", model_name), data, client=client)
[docs]
def update(model_name, model_id, data, client=default_client):
"""
Update an entry for given model, id and data.
Args:
model (str): The model type involved
id (str): The target model id
data (dict): The data to update
Returns:
dict: Updated entry
"""
return put(
url_path_join("data", model_name, model_id), data, client=client
)
[docs]
def upload(path, file_path, data={}, extra_files=[], client=default_client):
"""
Upload file located at *file_path* to given url *path*.
Args:
path (str): The url path to upload file.
file_path (str): The file location on the hard drive.
Returns:
Response: Request response object.
"""
url = get_full_url(path, client)
files = _build_file_dict(file_path, extra_files)
retry = True
while retry:
response = client.session.post(
url,
data=data,
headers=make_auth_header(client=client),
files=files,
)
_, retry = check_status(response, path, client=client)
try:
result = response.json()
except JSONDecodeError:
print(response.text)
raise
result_message = get_message_from_response(response, default_message="")
if result_message:
raise UploadFailedException(result_message)
return result
def _build_file_dict(file_path, extra_files):
files = {"file": open(file_path, "rb")}
i = 2
for file_path in extra_files:
files["file-%s" % i] = open(file_path, "rb")
i += 1
return files
[docs]
def download(path, file_path, params=None, client=default_client):
"""
Download file located at *file_path* to given url *path*.
Args:
path (str): The url path to download file from.
file_path (str): The location to store the file on the hard drive.
Returns:
Response: Request response object.
"""
path = build_path_with_params(path, params)
with client.session.get(
get_full_url(path, client),
headers=make_auth_header(client=client),
stream=True,
) as response:
with open(file_path, "wb") as target_file:
shutil.copyfileobj(response.raw, target_file)
return response
[docs]
def get_file_data_from_url(url, full=False, client=default_client):
"""
Return data found at given url.
"""
if not full:
url = get_full_url(url)
retry = True
while retry:
response = requests.get(
url,
stream=True,
headers=make_auth_header(client=client),
)
_, retry = check_status(response, url, client=client)
return response.content
[docs]
def import_data(model_name, data, client=default_client):
"""
Args:
model_name (str): The data model to import
data (dict): The data to import
"""
return post("/import/kitsu/%s" % model_name, data, client=client)
[docs]
def get_api_version(client=default_client):
"""
Returns:
str: Current version of the API.
"""
return get("", client=client)["version"]
[docs]
def get_current_user(client=default_client):
"""
Returns:
dict: User database information for user linked to auth tokens.
"""
return get("auth/authenticated", client=client)["user"]