@taskasyncdefexecute_endpoint(endpoint:str,databricks_credentials:"DatabricksCredentials",http_method:HTTPMethod=HTTPMethod.GET,params:Dict[str,Any]=None,json:Dict[str,Any]=None,**kwargs:Dict[str,Any],)->httpx.Response:""" Generic function for executing REST endpoints. Args: endpoint: The endpoint route. databricks_credentials: Credentials to use for authentication with Databricks. http_method: Either GET, POST, PUT, DELETE, or PATCH. params: URL query parameters in the request. json: JSON serializable object to include in the body of the request. **kwargs: Additional keyword arguments to pass. Returns: The httpx.Response from interacting with the endpoint. Examples: Lists jobs on the Databricks instance. ```python from prefect import flow from prefect_databricks import DatabricksCredentials from prefect_databricks.rest import execute_endpoint @flow def example_execute_endpoint_flow(): endpoint = "/2.1/jobs/list" databricks_credentials = DatabricksCredentials.load("my-block") params = { "limit": 5, "offset": None, "expand_tasks": True, } response = execute_endpoint( endpoint, databricks_credentials, params=params ) return response.json() ``` """ifisinstance(http_method,HTTPMethod):http_method=http_method.valueifparamsisnotNone:stripped_params=strip_kwargs(**params)else:stripped_params=NoneifjsonisnotNone:kwargs["json"]=strip_kwargs(**json)asyncwithdatabricks_credentials.get_client()asclient:response=awaitgetattr(client,http_method)(endpoint,params=stripped_params,**kwargs)returnresponse
Recursively serializes pydantic.BaseModel into JSON;
returns original obj if not a BaseModel.
Parameters:
Name
Type
Description
Default
obj
Any
Input object to serialize.
required
Returns:
Type
Description
Any
Serialized version of object.
Source code in prefect_databricks/rest.py
454647484950515253545556575859606162636465
defserialize_model(obj:Any)->Any:""" Recursively serializes `pydantic.BaseModel` into JSON; returns original obj if not a `BaseModel`. Args: obj: Input object to serialize. Returns: Serialized version of object. """ifisinstance(obj,list):return[serialize_model(o)foroinobj]elifisinstance(obj,Dict):return{k:serialize_model(v)fork,vinobj.items()}ifisinstance(obj,MODELS):returnmodel_dump(obj,mode="json")elifisinstance(obj,Enum):returnobj.valuereturnobj
Recursively drops keyword arguments if value is None,
and serializes any pydantic.BaseModel types.
Parameters:
Name
Type
Description
Default
**kwargs
Dict
Input keyword arguments.
{}
Returns:
Type
Description
Dict
Stripped version of kwargs.
Source code in prefect_databricks/rest.py
68697071727374757677787980818283848586
defstrip_kwargs(**kwargs:Dict)->Dict:""" Recursively drops keyword arguments if value is None, and serializes any `pydantic.BaseModel` types. Args: **kwargs: Input keyword arguments. Returns: Stripped version of kwargs. """stripped_dict={}fork,vinkwargs.items():v=serialize_model(v)ifisinstance(v,dict):v=strip_kwargs(**v)ifvisnotNone:stripped_dict[k]=vreturnstripped_dictor{}