Data Manager
Introduction
Interact with your Dataplant’s Data Manager.
Connect to your database / apis / streams / protocol / data_store
Examples
Datastore connect
from forepaas.dwh import connect
data_store = connect("data_store")
# Get bucket and upload image from url to path forepaas/test.jpg.
# And finally get the image from the bucket
bucket_test = data_store.get_bucket("test")
lists = bucket_test.list(recursive=True)
bucket_test.put_request("https://i.stack.imgur.com/r8jTK.jpg", path="forepaas/test.jpg")
data = lists.get("hello/test.jpg")
# Create a bucket if it does not already exists
if data_store.bucket_exists("test-exists") is False:
data_store.create_bucket("test-exists")
# Connect directly to the bucket test and remove the file
bucket_test2 = connect("data_store/test")
bucket_test2.delete("hello/test.jpg")
Connectors connect
import logging
from forepaas.dwh import connect
logger = logging.getLogger(__name__)
# connect to the source connector
source = connect("dwh/data_prim/chicago_calendar")
# extract dataframe from source and bulk insert into the connector
extract_parameters = {"condition": "week_day = 1"}
df = source.extract(extract_parameters)
logger.info(f"There are {len(df)} rows with week_day = 1 in the table chicago_calendar")
Protocols connect
import logging
from forepaas.dwh import connect
from forepaas.core.settings import CONFIG
import pandas as pd
logger = logging.getLogger(__name__)
# connect to the protocol connector and download the file
# It will download the file in the worker file directory (CONFIG["file_directory"])
filename = "chicago_calendar.csv"
protocol = connect(f"dwh/chicago_files/{filename}")
protocol.get()
df = pd.read_csv(f"{CONFIG['file_directory']}/{filename}", sep=";")
column_names = ", ".join(list(df.columns))
logger.info(f"File {filename} has columns {column_names} and it has {len(df)} rows")
Get raw
from forepaas.worker.protocol import get_raw
from forepaas.worker.connect import connect
# You will need a protocol instance to extract from
protocol = connect("dwh/chicago_files/chicago_calendar.csv")
source = get_raw(protocol)