upend/sdks/py/upend/upend.py

112 lines
3.2 KiB
Python

from typing import Any, Dict, Optional, Tuple, Union
from upend.util import escape
from upend.lib import LiveServerSession
import logging
from dataclasses import dataclass
@dataclass
class UpEndEntry:
entity: Optional[str]
attribute: Optional[str]
value: Optional[Union[str, int]]
def as_sexp(self) -> str:
return f"(matches {self._arg(self.entity)} {self._arg(self.attribute)} {self._arg(self.value)})"
@staticmethod
def _arg(arg: Optional[Union[str, int]]) -> str:
return f'"{escape(str(arg))}"' if arg else "?"
def __str__(self) -> str:
return self.as_sexp()
class UpEndError(RuntimeError):
pass
class UpEndCheckError(UpEndError):
pass
UpEndOptionalTriplet = Tuple[Optional[str], Optional[str], Optional[str]]
UpEndTriplet = Tuple[Optional[str], str, Union[str, int]]
class UpEnd:
def __init__(
self,
hostname: str = "localhost",
port: int = 8093,
ssl: bool = False,
initial_check: bool = True,
) -> None:
self.logger = logging.getLogger("upend")
self.session = LiveServerSession(
f"{'https' if ssl else 'http'}://{hostname}:{port}/api/"
)
if initial_check:
self.check()
def check(self) -> bool:
info = self.session.get("info")
if info.status_code != 200:
self.logger.error("Connection check failed!")
raise UpEndCheckError(info.text)
else:
self.logger.debug("Connection check passed successfully.")
return True
def query(
self, query: Union[UpEndOptionalTriplet, UpEndEntry, str]
) -> Dict[str, Any]:
query_out = None
if type(query) is UpEndEntry:
query_out = query.as_sexp()
if type(query) is tuple:
query_out = UpEndEntry(*query).as_sexp()
if type(query) is str:
query_out = query
if query_out is None:
raise RuntimeError("Incorrect argument type.")
self.logger.debug(f"Querying: {query_out}")
result = self.session.get("obj", params={"query": query_out})
if not result.ok:
raise UpEndError(result.text)
return result.json()
def get_raw(self, address: str, chunk_size: int = 8192):
request = self.session.get(f"raw/{address}", stream=True)
request.raise_for_status()
for chunk in request.iter_content(chunk_size=chunk_size):
yield chunk
def insert(
self, entry: Union[UpEndTriplet, UpEndEntry], value_type: str = "Value"
) -> Dict[str, Any]:
entry_out = None
if type(entry) is tuple:
entry_out = UpEndEntry(*entry)
if type(entry) is UpEndEntry:
entry_out = entry
if entry_out is None:
raise RuntimeError("Incorrect argument type.")
self.logger.debug(f"Inserting: {entry_out.as_sexp()}")
request = self.session.put(
f"obj",
json={
"entity": entry_out.entity,
"attribute": entry_out.attribute,
"value": {"t": value_type, "c": entry_out.value},
},
)
request.raise_for_status()
return request.json()