Data Manager

Introduction
Interact with your Dataplant’s Data Manager.
Connect to your database / apis / streams / protocol / data_store

Examples

Datastore connect

from forepaas.dwh import connect

data_store = connect("data_store")

# Get bucket and upload image from url to path forepaas/test.jpg.
# And finally get the image from the bucket
bucket_test = data_store.get_bucket("test")

lists = bucket_test.list(recursive=True)

bucket_test.put_request("https://i.stack.imgur.com/r8jTK.jpg", path="forepaas/test.jpg")
data = lists.get("hello/test.jpg")

# Create a bucket if it does not already exists
if data_store.bucket_exists("test-exists") is False:
    data_store.create_bucket("test-exists")

# Connect directly to the bucket test and remove the file
bucket_test2 = connect("data_store/test")
bucket_test2.delete("hello/test.jpg")

Connectors connect

import logging
from forepaas.dwh import connect


logger = logging.getLogger(__name__)

# connect to the source connector
source = connect("dwh/data_prim/chicago_calendar")

# extract dataframe from source and bulk insert into the connector
extract_parameters = {"condition": "week_day = 1"}
df = source.extract(extract_parameters)
logger.info(f"There are {len(df)} rows with week_day = 1 in the table chicago_calendar")

Protocols connect

import logging
from forepaas.dwh import connect
from forepaas.core.settings import CONFIG
import pandas as pd

logger = logging.getLogger(__name__)

# connect to the protocol connector and download the file
# It will download the file in the worker file directory (CONFIG["file_directory"])
filename = "chicago_calendar.csv"
protocol = connect(f"dwh/chicago_files/{filename}")
protocol.get()

df = pd.read_csv(f"{CONFIG['file_directory']}/{filename}", sep=";")
column_names = ", ".join(list(df.columns))
logger.info(f"File {filename} has columns {column_names} and it has {len(df)} rows")

Get raw

from forepaas.worker.protocol import get_raw
from forepaas.worker.connect import connect

# You will need a protocol instance to extract from
protocol = connect("dwh/chicago_files/chicago_calendar.csv")
source = get_raw(protocol)

API Reference