import sys
import functools
import json
import shutil
import os
from .encoder import CustomJSONEncoder
from .__version__ import __version__
from .exception import (
TooBigFileException,
NotAuthenticatedException,
NotAllowedException,
MethodNotAllowedException,
ParameterException,
RouteNotFoundException,
ServerErrorException,
UploadFailedException,
)
if sys.version_info[0] == 3:
from json import JSONDecodeError
from urllib.parse import urlencode
else:
JSONDecodeError = ValueError
from urllib import urlencode
DEBUG = os.getenv("GAZU_DEBUG", "false").lower() == "true"
class KitsuClient(object):
def __init__(
self,
host,
ssl_verify=True,
cert=None,
use_refresh_token=True,
callback_not_authenticated=None,
tokens={"access_token": None, "refresh_token": None},
access_token=None,
refresh_token=None,
):
self.tokens = tokens
if access_token:
self.access_token = access_token
if refresh_token:
self.refresh_token = refresh_token
self.use_refresh_token = use_refresh_token
self.callback_not_authenticated = callback_not_authenticated
self.session = requests.Session()
self.session.verify = ssl_verify
self.session.cert = cert
self.host = host
self.event_host = host
@property
def access_token(self):
return self.tokens.get("access_token", None)
@access_token.setter
def access_token(self, token):
self.tokens["access_token"] = token
@property
def refresh_token(self):
return self.tokens.get("refresh_token", None)
@refresh_token.setter
def refresh_token(self, token):
self.tokens["refresh_token"] = token
def refresh_access_token(self):
"""
Refresh access tokens for this client.
Returns:
dict: The new access token.
"""
response = self.session.get(
get_full_url("auth/refresh-token", client=self),
headers={
"User-Agent": "CGWire Gazu " + __version__,
"Authorization": "Bearer " + self.refresh_token,
},
)
check_status(response, "auth/refresh-token")
tokens = response.json()
self.access_token = tokens["access_token"]
self.refresh_token = None
return tokens
def make_auth_header(self):
"""
Make headers required to authenticate.
Returns:
dict: Headers required to authenticate.
"""
headers = {"User-Agent": "CGWire Gazu " + __version__}
if self.access_token:
headers["Authorization"] = "Bearer " + self.access_token
return headers
[docs]
def create_client(
host,
ssl_verify=True,
cert=None,
use_refresh_token=False,
callback_not_authenticated=None,
**kwargs
):
"""
Create a client with given parameters.
Args:
host (str): The host to use for requests.
ssl_verify (bool): Whether to verify SSL certificates.
cert (str): Path to a client certificate.
use_refresh_token (bool): Whether to automatically refresh tokens.
callback_not_authenticated (function): Function to call when not authenticated.
Returns:
KitsuClient: The created client.
"""
return KitsuClient(
host,
ssl_verify,
cert=cert,
use_refresh_token=use_refresh_token,
callback_not_authenticated=callback_not_authenticated,
**kwargs
)
default_client = None
try:
import requests
# Little hack to allow json encoder to manage dates.
requests.models.complexjson.dumps = functools.partial(
json.dumps, cls=CustomJSONEncoder
)
host = "http://gazu.change.serverhost/api"
default_client = create_client(host)
except Exception:
print("Warning, running in setup mode!")
[docs]
def host_is_up(client=default_client):
"""
Check if the host is up.
Args:
client (KitsuClient): The client to use for the request.
Returns:
bool: True if the host is up.
"""
try:
response = client.session.head(client.host)
except Exception:
return False
return response.status_code == 200
[docs]
def host_is_valid(client=default_client):
"""
Check if the host is valid by simulating a fake login.
Args:
client (KitsuClient): The client to use for the request.
Returns:
bool: True if the host is valid.
"""
if not host_is_up(client):
return False
try:
post("auth/login", {"email": ""})
except Exception as exc:
return isinstance(exc, (NotAuthenticatedException, ParameterException))
[docs]
def get_host(client=default_client):
"""
Get client.host.
Args:
client (KitsuClient): The client to use for the request.
Returns:
str: The host of the client.
"""
return client.host
[docs]
def get_api_url_from_host(client=default_client):
"""
Get the API url from the host.
Args:
client (KitsuClient): The client to use for the request.
Returns:
Zou url, retrieved from host.
"""
return client.host[:-4]
[docs]
def set_host(new_host, client=default_client):
"""
Set the host for the client.
Args:
new_host (str): The new host to set.
client (KitsuClient): The client to use for the request.
Returns:
str: The new host.
"""
client.host = new_host
return client.host
[docs]
def get_event_host(client=default_client):
"""
Get the host on which listening for events.
Args:
client (KitsuClient): The client to use for the request.
Returns:
str: The event host.
"""
return client.event_host or client.host
[docs]
def set_event_host(new_host, client=default_client):
"""
Set the host on which listening for events.
Args:
new_host (str): The new host to set.
client (KitsuClient): The client to use for the request.
Returns:
str: The new event host.
"""
client.event_host = new_host
return client.event_host
[docs]
def set_tokens(new_tokens, client=default_client):
"""
Store authentication token to reuse them for all requests.
Args:
new_tokens (dict): Tokens to use for authentication.
client (KitsuClient): The client to use for the request.
Returns:
dict: The stored tokens.
"""
client.tokens = new_tokens
return client.tokens
[docs]
def url_path_join(*items):
"""
Make it easier to build url path by joining every arguments with a '/'
character.
Args:
items (list): Path elements
Returns:
str: The joined path.
"""
return "/".join([item.lstrip("/").rstrip("/") for item in items])
[docs]
def get_full_url(path, client=default_client):
"""
Join host url with given path.
Args:
path (str): The path to integrate to host url.
Returns:
The result of joining configured host url with given path.
"""
return url_path_join(get_host(client), path)
[docs]
def build_path_with_params(path, params):
"""
Add params to a path using urllib encoding.
Args:
path (str): The url base path
params (dict): The parameters to add as a dict
Returns:
str: the builded path
"""
if not params:
return path
query_string = urlencode(params)
if query_string:
# Support base paths that already contain query parameters.
path += "&" if "?" in path else "?"
path += query_string
return path
[docs]
def get(path, json_response=True, params=None, client=default_client):
"""
Run a get request toward given path for configured host.
Args:
path (str): The path to query.
json_response (bool): Whether to return a json response.
params (dict): The parameters to pass to the request.
client (KitsuClient): The client to use for the request.
Returns:
The request result.
"""
if DEBUG:
print("GET", get_full_url(path, client))
path = build_path_with_params(path, params)
retry = True
while retry:
response = client.session.get(
get_full_url(path, client=client),
headers=make_auth_header(client=client),
)
_, retry = check_status(response, path, client=client)
if json_response:
return response.json()
else:
return response.text
[docs]
def post(path, data, client=default_client):
"""
Run a post request toward given path for configured host.
Args:
path (str): The path to query.
data (dict): The data to post.
client (KitsuClient): The client to use for the request.
Returns:
The request result.
"""
if DEBUG:
print("POST", get_full_url(path, client))
if not "password" in data:
print("Body:", data)
retry = True
while retry:
response = client.session.post(
get_full_url(path, client),
json=data,
headers=make_auth_header(client=client),
)
_, retry = check_status(response, path, client=client)
try:
result = response.json()
except JSONDecodeError:
print(response.text)
raise
return result
[docs]
def put(path, data, client=default_client):
"""
Run a put request toward given path for configured host.
Args:
path (str): The path to query.
data (dict): The data to put.
client (KitsuClient): The client to use for the request.
Returns:
The request result.
"""
if DEBUG:
print("PUT", get_full_url(path, client))
print("Body:", data)
retry = True
while retry:
response = client.session.put(
get_full_url(path, client),
json=data,
headers=make_auth_header(client=client),
)
_, retry = check_status(response, path, client=client)
return response.json()
[docs]
def delete(path, params=None, client=default_client):
"""
Run a delete request toward given path for configured host.
Args:
path (str): The path to query.
params (dict): The parameters to pass to the request.
client (KitsuClient): The client to use for the request.
Returns:
The request result.
"""
if DEBUG:
print("DELETE", get_full_url(path, client))
path = build_path_with_params(path, params)
retry = True
while retry:
response = client.session.delete(
get_full_url(path, client), headers=make_auth_header(client=client)
)
_, retry = check_status(response, path, client=client)
return response.text
[docs]
def get_message_from_response(
response, default_message="No additional information"
):
"""
A utility function that handles Zou's inconsistent message keys.
For a given request, checks if any error messages or regular messages were given and returns their value.
If no messages are found, returns a default message.
Args:
response: requests.Request - A response to check.
default_message: str - An optional default value to revert to if no message is detected.
Returns:
str: The message to display to the user.
"""
message = default_message
message_json = response.json()
if hasattr(message_json, "get"):
for key in ["error", "message"]:
if message_json.get(key):
message = message_json[key]
break
return message
[docs]
def check_status(request, path, client=None):
"""
Raise an exception related to status code, if the status code does not
match a success code. Print error message when it's relevant.
Args:
request (Request): The request to validate.
path (str): The path of the request.
client (KitsuClient): The client to use for the request.
Returns:
int: Status code
Raises:
ParameterException: when 400 response occurs
NotAuthenticatedException: when 401 response occurs
RouteNotFoundException: when 404 response occurs
NotAllowedException: when 403 response occurs
MethodNotAllowedException: when 405 response occurs
TooBigFileException: when 413 response occurs
ServerErrorException: when 500 response occurs
"""
status_code = request.status_code
if status_code == 404:
raise RouteNotFoundException(path)
elif status_code == 403:
raise NotAllowedException(path)
elif status_code == 400:
raise ParameterException(path, get_message_from_response(request))
elif status_code == 405:
raise MethodNotAllowedException(path)
elif status_code == 413:
raise TooBigFileException(
"%s: You send a too big file. "
"Change your proxy configuration to allow bigger files." % path
)
elif status_code in [401, 422]:
try:
if (
client
and client.refresh_token
and client.use_refresh_token
and request.json()["message"] == "Signature has expired"
):
client.refresh_access_token()
return status_code, True
else:
raise NotAuthenticatedException(path)
except NotAuthenticatedException:
if client and client.callback_not_authenticated:
retry = client.callback_not_authenticated(client, path)
if retry:
return status_code, True
raise
elif status_code in [500, 502]:
try:
print("A server error occured!\n")
stacktrace = request.json().get(
"stacktrace", "No stacktrace sent by the server"
)
print("Server stacktrace:\n%s" % stacktrace)
message = get_message_from_response(
response=request,
default_message="No message sent by the server",
)
print("Error message:\n%s\n" % message)
except Exception:
print(request.text)
raise ServerErrorException(path)
return status_code, False
[docs]
def fetch_all(
path, params=None, client=default_client, paginated=False, limit=None
):
"""
Args:
path (str): The path for which we want to retrieve all entries.
params (dict): The parameters to pass to the request.
client (KitsuClient): The client to use for the request.
paginated (bool): Will query entries page by page.
limit (int): Limit the number of entries per page.
Returns:
list: All entries stored in database for a given model. You can add a
filter to the model name like this: "tasks?project_id=project-id"
"""
if paginated:
if not params:
params = {}
params["page"] = 1
if limit is not None:
params["limit"] = limit
url = url_path_join("data", path)
response = get(url, params=params, client=client)
if not paginated:
return response
nb_pages = response.get("nb_pages", 1)
current_page = response.get("page", 1)
results = response.get("data", [])
if current_page != nb_pages:
for page in range(2, nb_pages + 1):
params["page"] = page
response = get(
url,
params=params,
client=client,
)
results += response.get("data", [])
return results
[docs]
def fetch_first(path, params=None, client=default_client):
"""
Args:
path (str): The path for which we want to retrieve the first entry.
params (dict): The parameters to pass to the request.
client (KitsuClient): The client to use for the request.
Returns:
dict: The first entry for which a model is required.
"""
entries = get(url_path_join("data", path), params=params, client=client)
if len(entries) > 0:
return entries[0]
else:
return None
[docs]
def fetch_one(model_name, id, params=None, client=default_client):
"""
Function dedicated at targeting routes that returns a single model
instance.
Args:
model_name (str): Model type name.
id (str): Model instance ID.
params (dict): The parameters to pass to the request.
client (KitsuClient): The client to use for the request.
Returns:
dict: The model instance matching id and model name.
"""
return get(
url_path_join("data", model_name, id), params=params, client=client
)
[docs]
def create(model_name, data, client=default_client):
"""
Create an entry for given model and data.
Args:
model_name (str): The model type involved.
data (str): The data to use for creation.
client (KitsuClient): The client to use for the request.
Returns:
dict: Created entry
"""
return post(url_path_join("data", model_name), data, client=client)
[docs]
def update(model_name, model_id, data, client=default_client):
"""
Update an entry for given model, id and data.
Args:
model_name (str): The model type involved.
model_id (str): The target model id.
data (dict): The data to update.
client (KitsuClient): The client to use for the request.
Returns:
dict: Updated entry
"""
return put(
url_path_join("data", model_name, model_id), data, client=client
)
[docs]
def upload(
path,
file_path=None,
data={},
extra_files=[],
files=None,
client=default_client,
):
"""
Upload file located at *file_path* to given url *path*.
Args:
path (str): The url path to upload file.
file_path (str): The file location on the hard drive.
data (dict): The data to send with the file.
extra_files (list): List of extra files to upload.
files (dict): The dictionary of files to upload.
client (KitsuClient): The client to use for the request.
Returns:
Response: Request response object.
"""
url = get_full_url(path, client)
if not files:
files = _build_file_dict(file_path, extra_files)
retry = True
while retry:
response = client.session.post(
url,
data=data,
headers=make_auth_header(client=client),
files=files,
)
_, retry = check_status(response, path, client=client)
try:
result = response.json()
except JSONDecodeError:
print(response.text)
raise
result_message = get_message_from_response(response, default_message="")
if result_message:
raise UploadFailedException(result_message)
return result
def _build_file_dict(file_path, extra_files):
"""
Build a dictionary of files to upload.
Args:
file_path (str): The file location on the hard drive.
extra_files (list): List of extra files to upload.
Returns:
dict: The dictionary of files to upload.
"""
files = {"file": open(file_path, "rb")}
i = 0
for file_path in extra_files:
i += 1
files["file-%s" % i] = open(file_path, "rb")
return files
[docs]
def download(path, file_path, params=None, client=default_client):
"""
Download file located at *file_path* to given url *path*.
Args:
path (str): The url path to download file from.
file_path (str): The location to store the file on the hard drive.
params (dict): The parameters to pass to the request.
client (KitsuClient): The client to use for the request.
Returns:
Response: Request response object.
"""
path = build_path_with_params(path, params)
with client.session.get(
get_full_url(path, client),
headers=make_auth_header(client=client),
stream=True,
) as response:
with open(file_path, "wb") as target_file:
shutil.copyfileobj(response.raw, target_file)
return response
[docs]
def get_file_data_from_url(url, full=False, client=default_client):
"""
Return data found at given url.
Args:
url (str): The url to fetch data from.
full (bool): Whether to use full url.
client (KitsuClient): The client to use for the request.
Returns:
bytes: The data found at the given url.
"""
if not full:
url = get_full_url(url)
retry = True
while retry:
response = requests.get(
url,
stream=True,
headers=make_auth_header(client=client),
)
_, retry = check_status(response, url, client=client)
return response.content
[docs]
def import_data(model_name, data, client=default_client):
"""
Import data for given model.
Args:
model_name (str): The data model to import.
data (dict): The data to import.
client (KitsuClient): The client to use for the request.
Returns:
dict: The imported data.
"""
return post("/import/kitsu/%s" % model_name, data, client=client)
[docs]
def get_api_version(client=default_client):
"""
Get the current version of the API.
Args:
client (KitsuClient): The client to use for the request.
Returns:
str: Current version of the API.
"""
return get("", client=client)["version"]
[docs]
def get_current_user(client=default_client):
"""
Get the current user.
Args:
client (KitsuClient): The client to use for the request.
Returns:
dict: User database information for user linked to auth tokens.
"""
return get("auth/authenticated", client=client)["user"]