Data Manager

Introduction
Interact with your Dataplant’s Data Manager.
Connect to your database / apis / streams / protocol / data_store

Examples

Datastore connect

from forepaas.dwh import connect

data_store = connect("data_store")

# Get bucket and upload image from url to path forepaas/test.jpg.
# And finally get the image from the bucket
bucket_test = data_store.get_bucket("test")

lists = bucket_test.list(recursive=True)

bucket_test.put_request("https://i.stack.imgur.com/r8jTK.jpg", path="forepaas/test.jpg")
data = lists.get("hello/test.jpg")

# Create a bucket if it does not already exists
if data_store.bucket_exists("test-exists") is False:
    data_store.create_bucket("test-exists")

# Connect directly to the bucket test and remove the file
bucket_test2 = connect("data_store/test")
bucket_test2.delete("hello/test.jpg")

Connectors connect

import logging
from forepaas.dwh import connect


logger = logging.getLogger(__name__)

# connect to the source connector
source = connect("dwh/data_prim/chicago_calendar")

# extract dataframe from source and bulk insert into the connector
extract_parameters = {"condition": "week_day = 1"}
df = source.extract(extract_parameters)
logger.info(f"There are {len(df)} rows with week_day = 1 in the table chicago_calendar")

Protocols connect

import logging
from forepaas.dwh import connect
from forepaas.core.settings import CONFIG
import pandas as pd

logger = logging.getLogger(__name__)

# connect to the protocol connector and download the file
# It will download the file in the worker file directory (CONFIG["file_directory"])
filename = "chicago_calendar.csv"
protocol = connect(f"dwh/chicago_files/{filename}")
protocol.get()

df = pd.read_csv(f"{CONFIG['file_directory']}/{filename}", sep=";")
column_names = ", ".join(list(df.columns))
logger.info(f"File {filename} has columns {column_names} and it has {len(df)} rows")

Get raw

from forepaas.worker.protocol import get_raw
from forepaas.worker.connect import connect

# You will need a protocol instance to extract from
protocol = connect("dwh/chicago_files/chicago_calendar.csv")
source = get_raw(protocol)

API Reference

class forepaas.dwh.Attributes

Bases: object

list()
list_table_attribute(database, table)
class forepaas.dwh.AzureBlobStorage(params)

Bases: Datastore

DATASTORE is an object to interact with ForePaaS Datastore (fp-datastore) Example:

data_store = DATASTORE(config) bucket = data_store.get_bucket(BUCKET_NAME)

Parameters:
  • params – dict - params to connect to fp-datastore

  • example (params["secure"] - datastore is ssl secure,) – localhost:9000

  • example – ROOT

  • example – ROOT

  • example – True

Attributes:

azure AzureBlobStorage instance to handle request to fp-datastore

bucket_exists(name)

Find out if a bucket exists or not :param name: string - Bucket name :return: bool - Success of operation

create_bucket(name)

Method to add a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation

get_bucket(name)

Method to get a bucket instance from name :param name: string - Bucket name :return: Bucket - Bucket instance to handle files

get_buckets()

Method that gets all buckets from fp-datastore :return: list - containing the instance of Bucket

get_hadoop_config()

Hadoop config for PySpark

Returns:

Hadoop config

Return type:

dict

get_spark_session_config()

Spark session config for PySpark

Returns:

Spark session config

Return type:

dict

list(return_type='array')

Method to list buckets from fp-datastore :param return_type: string - Determine the type you want to get “array” or “str” :return: str or array - Contains information about buckets on fp-datastore

message_from_error(func, error)
remove_bucket(name)

Method to delete a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation

remove_bucket_not_empty(name)

Method to delete a not empty bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation

spark_prefix(bucket)
class forepaas.dwh.AzureBucket(client, name)

Bases: object

Bucket is an object to interact with a specific Bucket from ForePaaS Datastore (fp-datastore) Example:

client = azure.storage.blob.BlobServiceClient(connection_string) BUCKET_NAME = “example_bucket” bucket = Bucket(client, BUCKET_NAME)

Parameters:

:param Azure - Instance of azure.storage.blob.BlobServiceClient :param name - Name of the bucket

Attributes:

azure: Azure - Instance of azure.storage.blob.BlobServiceClient name - Name of the bucket

delete(object_name)

Delete multiple/single file in fp-datastore’s bucket

Parameters:

object_name (str or array) – Name of the object(s) to delete

fcopy_to(name, object_name, object_source, **kwargs)

Copy file from bucket to a new bucket in fp-datastore

Parameters:
  • name (str) – Name of the bucket for new object.

  • object_name (str) – Name of the new object.

  • object_source – Name of the object to be copied.

:type object_source str :param **kwargs - remaining keyword arguments are passed to self.client.copy_object

fget(object_name, file_path, **kwargs)

Get an object from fp-datastore’bucket to local path

Parameters:
  • object_name (str) – Name of the object to get.

  • file_path (str) – Path on the local filesystem to which the object data will be written.

:param **kwargs - remaining keyword arguments are passed to self.client.fget_object :return: Object stat info from the server :rtype: Object

fput(object_name, file_path, **kwargs)

Put an file to fp-datastore’s bucket

Parameters:
  • object_name (str) – Name of the object. to upload

  • file_path (str) – Path on the local filesystem from which object data will be read.

:param **kwargs - remaining keyword arguments are passed to self.client.fput_object :return: Object etag computed by the server :rtype: str

get(file_name, **kwargs)

Get and object from fp-datastore’bucket from a filename

Parameters:

file_name (str) – File name from bucket you want to get

:param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Response from fp-datastore request :rtype: urllib3.response.HTTPResponse

Example:
>>> bucket.get('folder/file_name')
>>> data = bucket.get('file_name')
>>> for d in data.stream(32*1024):
>>>    sdk.log("info", d)
get_content(file_name, **kwargs)

Get the content from fp-datastore’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Content of the file :rtype: str

list(metadata=False, recursive=True, **kwargs)

List files from fp-datastore’s bucket :param metadata - Get metadata for all files listed, defaults to True :type metadata: bool, optional :param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [Object]

list_filename(return_type='array', contains='', recursive=True, **kwargs)

List files’ name from fp-datastore’s bucket

Parameters:

return_type (str, optional) – Type of return you want, defaults to ‘array’

:param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param contains: Filter file list with file_name containing the value, defaults to ‘’ :type contains: str, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [array, str]

listen_notification(path=None, suffix=None, events=None)

Listen for notifications on a bucket. Additionally one can provide filters for prefix, suffix and events. There is no prior set bucket notification needed to use this API.

Parameters:
  • path (str) – Object key prefix to filter notifications for

  • suffix (str) – Object key suffix to filter notifications for

  • events (list) – Enable notifications for specific event types

Returns:

Iterator of azure blob storage event

Return type:

iterator

message_from_error(func, error)
put(object_name, data, length=None, **kwargs)

Put an object to fp-datastore’s bucket

Parameters:
  • object_name (str) – Name of the object to upload

  • data (io.RawIOBase) – Data to upload to azure blob storage

  • length (int) – Data length

:param **kwargs - remaining keyword arguments are passed to self.client.put_object :return: Object etag computed by the server :rtype: str

put_request(url, path='', data=None, method='GET', headers=None)

Get an object from an http request and upload it to fp-datastore’s bucket

Parameters:
  • url (str) – Url to get the file from

  • path (str, optional) – Where to upload the file in fp-datastore’s bucket, defaults to ‘’

  • data – (optional) Dictionary, list of tuples, bytes, or file-like

object to send in the body of the request. :param method: Method used by request to get file, defaults to ‘GET’ :type method: str, optional :param headers: Headers send in request, defaults to {} :type headers: dict, optional

stat(object_name, **kwargs)

Get stat for a file in fp-datastore’s bucket

Parameters:

object_name (str or array) – Name of the object to get stat

:param **kwargs - remaining keyword arguments are passed to self.client.stat_object

class forepaas.dwh.Bucket(client, name, bucket_type='s3')

Bases: object

Bucket is an object to interact with a specific Bucket from any ForePaas compatible Datastore

Parameters:
  • client – Instance of client

  • name – Name of the bucket

Example:
>>> client = boto3.client('localhost:9000')
>>> BUCKET_NAME = "example_bucket"
>>> bucket = Bucket(client, BUCKET_NAME)
delete(object_name)

Delete multiple/single file in store’s bucket

Parameters:

object_name (str or array) – Name of the object(s) to delete

fcopy_to(bucket_name, object_name, object_source, **kwargs)

Copy file from bucket to a new bucket in fp-datastore

Parameters:
  • bucket_name (str) – Name of the bucket for new object.

  • object_name (str) – Name of the new object.

  • object_source (str) – Name of the object to be copied.

  • **kwargs – remaining keyword arguments are passed to provider copy objects function

fget(object_name, file_path, **kwargs)

Get an object from store’bucket to local path

Parameters:
  • object_name (str) – Name of the object to get.

  • file_path (str) – Path on the local filesystem to which the object data will be written.

  • **kwargs – remaining keyword arguments are passed to provider fget function

Returns:

Object stat info from the server

Return type:

Object

fput(object_name, file_path, **kwargs)

Put an file to store’s bucket

Parameters:
  • object_name (str) – Name of the object. to upload

  • file_path (str) – Path on the local filesystem from which object data will be read.

  • **kwargs – remaining keyword arguments are passed to provider fput function

Returns:

Object etag computed by the server

Return type:

str

get(file_name, **kwargs)

Get and object from store’bucket from a filename

Parameters:
  • file_name (str) – File name from bucket you want to get

  • **kwargs – remaining keyword arguments are passed to provider get object function

Returns:

Content of the file

Return type:

Streamer

Example:

bucket.get(‘folder/file_name’) data = bucket.get(‘file_name’)

get_content(file_name, **kwargs)

Get the content from store’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs: remaining keyword arguments are passed to provider get object function :return: Content of the file :rtype: str

list(metadata=True, recursive=True, **kwargs)

List files from store’s bucket

Parameters:
  • metadata (bool, optional) – Get metadata for all files listed, defaults to True

  • **kwargs – remaining keyword arguments are passed to provider function to list files

Raises:

Exception – [description]

Returns:

List of bucket files

Return type:

[Object]

list_filename(return_type='array', contains='', recursive=True, **kwargs)

List files’ name from storage’s bucket

Parameters:
  • return_type (str, optional) – Type of return you want, defaults to ‘array’

  • contains (str, optional) – Filter file list with file_name containing the value, defaults to ‘’

  • **kwargs – remaining keyword arguments are passed to provider function to list files

Raises:

Exception – [description]

Returns:

List of bucket files

Return type:

[array, str]

listen_notification(path=None, suffix=None, events=None)

Listen for notifications on a bucket. Additionally one can provide filters for prefix, suffix and events. There is no prior set bucket notification needed to use this API.

Parameters:
  • path (str) – Object key prefix to filter notifications for

  • suffix (str) – Object key suffix to filter notifications for

  • events (list) – Enable notifications for specific event types

Returns:

Iterator of event

Return type:

iterator

put(object_name, data, length=None, **kwargs)

Put an object to store’s bucket

Parameters:
  • object_name (str) – Name of the object to upload

  • data (str) – Data to upload to data store

  • length (int) – Data length

  • **kwargs – remaining keyword arguments are passed to provider put object function

Returns:

Object etag computed by the server

Return type:

str

put_request(url, path='', data=None, method='GET', headers=None)

Get an object from an http request and upload it to store’s bucket

Parameters:
  • url (str) – Url to get the file from

  • path (str, optional) – Where to upload the file in Google store’s bucket, defaults to ‘’

  • data – (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the request.

  • method (str, optional) – Method used by request to get file, defaults to ‘GET’

  • headers (dict, optional) – Headers send in request, defaults to {}

stat(object_name, **kwargs)

Get stat for a file in fp-datastore’s bucket

Parameters:
  • object_name (str or array) – Name of the object to get stat

  • **kwargs – remaining keyword arguments are passed to provider object’s stat function

exception forepaas.dwh.BucketNotFound

Bases: Exception

Bucket does not exists in the data_store

exception forepaas.dwh.BucketUnauthorized

Bases: Exception

Bucket is from the dataplant system and should not be accessed

class forepaas.dwh.Connector(configuration, params)

Bases: object

Class to handle Connectors

Parameters:
  • configuration (dict) – Configuration sent by the operator containing dwh information about sources and tables

  • params (dict) – Action params configured in the control plane

configure(*args, **kwargs)
delete(*args, **kwargs)
escape_single_quote(str)

Escape single quote :param str: string to escape :return: escaped string

extract(*args, **kwargs)
get_endpoint_parameter(key, default_value=None)

Search for parameter by key. First of all search in params from job, after that search in table_dwh

Parameters:
  • key (str) – Key to search for

  • default_value (mixed, optional) – Defautl value if the key is not found, defaults to None

Returns:

Value found

Return type:

mixed

get_error(class_name, func, err, _file)
get_limit(limit=-1)

Get the limit of the connector :param limit: limit of the connector :return: limit of the connector

get_parameter(key, default_value=None)

Search for parameter by key. First of all search in params from job, after that search in the configuration[“parameters”] and configuration. If the key is still not found try to found it in table_dwh

Parameters:
  • key (str) – Key to search for

  • default_value (mixed, optional) – Defautl value if the key is not found, defaults to None

Returns:

Value found

Return type:

mixed

get_perimeter_query(params)

Get the perimeter query :param params: params of the action :return: perimeter query

get_return_type(return_type='')

Get the return type of the connector :param return_type: return type of the connector :return: return type of the connector

get_segmentation_query(params)

Get the segmentation query :param params: params of the action :return: segmentation query

get_source_default_schema()

Get source default schema

Returns:

source default schema

Return type:

dict

get_table_parameter(key, default_value=None)

Search for parameter by key in table_dwh or inside table_dwh parameters

Parameters:
  • key (str) – Key to search for

  • default_value (mixed, optional) – Defautl value if the key is not found, defaults to None

Returns:

Value found

Return type:

mixed

handle_api_extraction(lines, limit=-1, return_type='dict')

THis function should be return at the end of an extract of an api type connector like Facebook, GA, Youtube… It will add a log to the account in the Data Manager and then return the lines depending of the limit parameter

Parameters:

lines (list(dict)) – Extracted lines from api

Returns:

Extracted lines limited or not

Return type:

list(dict)

handle_query_cursor(cursor, return_type='')

Handle query cursor :param cursor: cursor of the query :param return_type: return type of the connector :return: query result

insert(*args, **kwargs)
linearize(data, params)

Method to used to linearize data from objects that might contain each multiple lines :params data: data to linearized :type data: dict or list :return: list of data linearized :rtype: list

linearize_no_main_node(data, params)

Method to used to linearize data from objects that might contain each multiple lines :params data: data to linearized :type data: dict or list :return: list of data linearized :rtype: list

load(*args, **kwargs)
merge_conditions(condition, cond)
select(*args, **kwargs)
update(*args, **kwargs)
class forepaas.dwh.DM

Bases: object

Class to handle Data Manager Operation

Parameters:
  • conf (dict) – Configuration sent by the operator containing dwh information about sources and tables

  • params (dict) – Action params configured in the control plane

add_account_logstatus(status, code, message='', account_key=None)

Add logstatus by account

Parameters:
  • status (str) – Database account status

  • code (str) – Status code

  • message (str, optional) – Message

  • account_key (str, optional) – Account key

get_account_key_conf(connection_str, account_key)

get account key conf

Parameters:
  • connection_str (str) – connection string

  • account_key (str) – account key

Raises:
  • Exception – dbname not found in configuration: …

  • Exception – no accounts found in configuration: …

  • Exception – account_key: … not found in configuration: …

Returns:

account key conf

Return type:

dict

get_db_account_keys(connection_str)

Get db account keys

Parameters:

connection_str (str) – connection string

Returns:

db account_keys

Return type:

list

get_source_default_schema(dwh_db=None)

Get source default schema

Parameters:

dwh_db (dict, optional) – DWH databases, defaults to None

Returns:

source default schema

Return type:

dict

push_account_logstatus(db_name)
class forepaas.dwh.DataTransformer

Bases: object

Utility class used to apply rules to the source data

Class variable rule_to_function:

dict - mapping the rule to corresponding function

Class variable condition_to_operator:

dict - mapping the condition to the corresponding operator

apply_rules(rules, data, error_rules)

Method that filters the dataframe using rules conditions

Parameters:
  • rules – list - list of rules to apply

  • data – dataframe - containing the data

apply_schema(data, extractor_type, source_default_schema=None)

Method that applies the schema to the data

Parameters:
  • data – dataframe - containing the data

  • extractor_type – str - type of the extractor

  • source_default_schema – dict - default schema of the source

Returns:

dataframe - containing the data with the schema applied

get_schema()

Method that returns the schema of the data

Returns:

dict - schema of the data

time_delta_to_datetime(data)

Method that transforms the timedelta columns to datetime

Parameters:

data – dataframe - containing the data

Returns:

dataframe - containing the data with the timedelta columns transformed to datetime

class forepaas.dwh.Database

Bases: object

attribute_rules_delete(attribute_id)
create(content)
database_tables(database, table)
get(database)
list()
remove(database)
rules_create(rule)
table(database, table)
table_attribute(database, table, attribute_name)
table_attribute_delete(database, table, attribute_id)
table_attribute_update(database, table, attribute)
table_attributes(database, table)
table_attributes_create(database, table, data)
table_attributes_delete(database, table)
table_attributes_update(database, table, data)
table_rules_list(database, table)
table_update(database, table, data)
tables()
update(database, content)
class forepaas.dwh.Datalake

Bases: object

Class to handle Datalake

Parameters:
  • configuration (dict) – Configuration sent by the operator containing dwh information about sources and tables

  • params (dict) – Action params configured in the control plane

put_in_datalake(connector, content=None)

Put the file in the datalake

Parameters:
  • connector (Connector) – Connector to put

  • content (str) – Content to put in the file

search_datalake_path(connector)

Search the path of the file to get in the datalake

Parameters:

connector (Connector) – Connector to get

class forepaas.dwh.Datastore(params)

Bases: object

DATASTORE is an object to interact with ForePaaS compatible datastore (S3, GoogleStore, Azure blob storage)

Parameters:

params – dict - params to connect to client store

Example:
>>> data_store = DATASTORE(config)
>>> bucket = data_store.get_bucket(BUCKET_NAME)
bucket_exists(name)

Find out if a bucket exists or not

Parameters:

name – string - Bucket name

Returns:

bool - Success of operation

client = None
create_bucket(name)

Method to add a bucket in Google store

Parameters:

name – string - Bucket name

Returns:

bool - Success of operation

get_bucket(name)

Method to get a bucket instance from name

Parameters:

name – string - Bucket name

Returns:

Bucket - Bucket instance to handle files

get_buckets()

Method that gets all buckets from Google Store

Returns:

list - containing the instance of Bucket

list(return_type='array')

Method to list buckets from Google Store

Parameters:

return_type – string - Determine the type you want to get “array” or “str”

Returns:

str or array - Contains information about buckets on Google Store

remove_bucket(name)

Method to delete a bucket in the Google store

Parameters:

name – string - Bucket name

Returns:

bool - Success of operation

remove_bucket_not_empty(name)

Method to delete a not empty bucket in the Google Store

Parameters:

name – string - Bucket name

Returns:

bool - Success of operation

exception forepaas.dwh.DwhRequestException(response)

Bases: Exception

class forepaas.dwh.Factory(connection_string, _type)

Bases: object

Class to instanciate connector/protocol

Parameters:
  • connection_string (str) – Connection string used to get the corresponding module adapter.connection/source

  • _type (str) – Type of connector

  • params (dict) – Action params configured in the control plane

decrypt_secret(content)
get_module(cls)

Import module using ClassName and ModulePath in registered modules

Raises:

Exception – Import module error

Returns:

Module Class

Return type:

Class

get_params()

Add source and original_source to params if it is not there. This is to make the protocol/connector work properly

Returns:

Action parameters

Return type:

dict

instanciate(configure=True)

Match the connection_string instance_name with the imported modules Instanciate the class, configure and return it

Returns:

Class instance from the connection string

Return type:

Instance

register_forepaas_modules()

Register all connectors and protocols modules inside the forepaas folder

Returns:

Dictionary of all modules with its modulePath and className

Return type:

dict

class forepaas.dwh.Flashzone

Bases: object

delete(database, table_name)
get(database, table_name)
put(database, table_name, data)
class forepaas.dwh.GoogleBucket(client, name)

Bases: object

Bucket is an object to interact with a specific Bucket from ForePaaS Datastore (Google store)

Parameters:

client – Google storage client

:param name - Name of the bucket

Example:
>>> client = google.cloud.storage.Client()
>>> BUCKET_NAME = "example_bucket"
>>> bucket = Bucket(client, BUCKET_NAME)
delete(object_name)

Delete multiple/single file in Google store’s bucket

Parameters:

object_name (str or array) – Name of the object(s) to delete

fcopy_to(name, object_name, object_source, **kwargs)

Copy file from bucket to a new bucket in fp-datastore

Parameters:
  • name (str) – Name of the bucket for new object.

  • object_name (str) – Name of the new object.

  • object_source – Name of the object to be copied.

:type object_source str :param **kwargs - remaining keyword arguments are passed to blob.rewrite

fget(object_name, file_path, **kwargs)

Get an object from Google store’bucket to local path

Parameters:
  • object_name (str) – Name of the object to get.

  • file_path (str) – Path on the local filesystem to which the object data will be written.

:param **kwargs - remaining keyword arguments are passed to blob.download_to_filename :return: Object stat info from the server :rtype: Object

fput(object_name, file_path, metadata=None, **kwargs)

Put an file to Google store’s bucket

Parameters:
  • object_name (str) – Name of the file to upload

  • file_path (str) – Path on the local filesystem from which file will be read.

:param **kwargs - remaining keyword arguments are passed to blob.upload_from_filename :return: Object etag computed by the server :rtype: str

get(file_name, **kwargs)

Get and object from Google store’bucket from a filename

Parameters:

file_name (str) – File name from bucket you want to get

:param **kwargs - remaining keyword arguments are passed to blob.download_as_string :return: Content of the file :rtype: Streamer

Example:

bucket.get(‘folder/file_name’) data = bucket.get(‘file_name’)

get_content(file_name, **kwargs)

Get the content from Google store’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs - remaining keyword arguments are passed to blob.download_as_string :return: Content of the file :rtype: str

list(metadata=False, **kwargs)

List files from Google store’s bucket

:param metadata - Get metadata for all files listed, defaults to True :type metadata: bool, optional :param **kwargs - remaining keyword arguments are passed to bucket.list_blobs :raises Exception: [description] :return: List of bucket files :rtype: [Object]

list_filename(return_type='array', contains='', **kwargs)

List files’ name from google storage’s bucket

Parameters:
  • return_type (str, optional) – Type of return you want, defaults to ‘array’

  • contains (str, optional) – Filter file list with file_name containing the value, defaults to ‘’

:param **kwargs - remaining keyword arguments are passed to self.client.list_blobs :raises Exception: [description] :return: List of bucket files :rtype: [array, str]

listen_notification(path=None, suffix=None, events=None)

Listen for notifications on a bucket. Additionally one can provide filters for prefix, suffix and events. There is no prior set bucket notification needed to use this API.

Parameters:
  • path (str) – Object key prefix to filter notifications for

  • suffix (str) – Object key suffix to filter notifications for

  • events (list) – Enable notifications for specific event types

Returns:

Iterator of event

Return type:

iterator

message_from_error(func, error)
put(object_name, data, length=None, **kwargs)

Put an object to Google store’s bucket

Parameters:
  • object_name (str) – Name of the object to upload

  • data (str) – Data to upload to google

  • length (int) – Data length

:param **kwargs - remaining keyword arguments are passed to blob.upload_from_file :return: Object etag computed by the server :rtype: str

put_request(url, path='', data=None, method='GET', headers=None)

Get an object from an http request and upload it to Google store’s bucket

Parameters:
  • url (str) – Url to get the file from

  • path (str, optional) – Where to upload the file in Google store’s bucket, defaults to ‘’

  • data – (optional) Dictionary, list of tuples, bytes, or file-like

object to send in the body of the request. :param method: Method used by request to get file, defaults to ‘GET’ :type method: str, optional :param headers: Headers send in request, defaults to {} :type headers: dict, optional

class forepaas.dwh.GoogleStore(params)

Bases: Datastore

bucket_exists(name)

Find out if a bucket exists or not :param name: string - Bucket name :return: bool - Success of operation

create_bucket(name)

Method to add a bucket in Google store :param name: string - Bucket name :return: bool - Success of operation

get_bucket(name)

Method to get a bucket instance from name :param name: string - Bucket name :return: Bucket - Bucket instance to handle files

get_buckets()

Method that gets all buckets from Google Store :return: list - containing the instance of Bucket

get_hadoop_config()

Hadoop config for PySpark

Returns:

Hadoop config

Return type:

dict

get_spark_session_config()

Spark session config for PySpark

Returns:

Spark session config

Return type:

dict

list(return_type='array')

Method to list buckets from Google Store :param return_type: string - Determine the type you want to get “array” or “str” :return: str or array - Contains information about buckets on Google Store

message_from_error(func, error)
metadata_prefix = ''
remove_bucket(name)

Method to delete a bucket in the Google store :param name: string - Bucket name :return: bool - Success of operation

remove_bucket_not_empty(name)

Method to delete a not empty bucket in the Google Store :param name: string - Bucket name :return: bool - Success of operation

spark_prefix(bucket)
class forepaas.dwh.LogicalObject

Bases: object

create(content)
get(name)
list()
remove(name)
update(name, content)
update_status(status)
class forepaas.dwh.Metas

Bases: object

table(table_name, database=None)

Method that returns the table

Parameters:

table_name – str - table name

table_attribute(db_name, table_name, attribute_name)

Method that returns the attribute of the table

Parameters:
  • db_name – str - database name

  • table_name – str - table name

  • attribute_name – str - attribute name

Returns:

dict - attribute

table_attribute_delete(db_name, table_name, attribute_name)

Method that deletes the attribute of the table

Parameters:
  • db_name – str - database name

  • table_name – str - table name

  • attribute_name – str - attribute name

Returns:

int - status code

table_attribute_update(db_name, table_name, attribute)

Method that updates the attribute of the table

Parameters:
  • db_name – str - database name

  • table_name – str - table name

  • attribute – dict - attribute data

Returns:

int - status code

table_attributes(db_name, table_name)

Method that returns the list of attributes of the table

Parameters:
  • db_name – str - database name

  • table_name – str - table name

Returns:

list - list of attributes

table_update(table_name, data, database=None)

Method that updates the table

Parameters:
  • table_name – str - table name

  • data – dict - table data

Returns:

int - status code

tables()

Method that returns the list of tables

Returns:

list - list of tables

tables_attributes()

Method that returns the list of attributes of all tables

Returns:

list - list of attributes

class forepaas.dwh.Protocol(configuration, params)

Bases: object

configure()
delete(**kwargs)
get(**kwargs)
get_error_msg(e, class_name, func, file)
get_parameter(key, default_value=None)

Search for parameter by key. First of all search in params from job, after that search in the configuration[“parameters”] and configuration. If the key is still not found try to found it in table_dwh

Parameters:
  • key (str) – Key to search for

  • default_value (mixed, optional) – Defautl value if the key is not found, defaults to None

Returns:

Value found

Return type:

mixed

get_table_parameter(key, default_value=None)

Search for parameter by key in table_dwh or inside table_dwh parameters

Parameters:
  • key (str) – Key to search for

  • default_value (mixed, optional) – Defautl value if the key is not found, defaults to None

Returns:

Value found

Return type:

mixed

pre_get(to, filename)
pre_list(path)
put(**kwargs)
class forepaas.dwh.S3Bucket(client, name)

Bases: object

Bucket is an object to interact with a specific Bucket from ForePaaS Datastore (fp-datastore) Example:

client = S3(‘play.min.io:9000’) BUCKET_NAME = “example_bucket” bucket = Bucket(client, BUCKET_NAME)

Parameters:
param S3:

S3 - Instance of S3Client

:param name - Name of the bucket

Attributes:

S3: S3 - Instance of S3Client name - Name of the bucket

delete(object_name)

Delete multiple/single file in fp-datastore’s bucket

Parameters:

object_name (str or array) – Name of the object(s) to delete

fcopy_to(name, object_name, object_source, **kwargs)

Copy file from bucket to a new bucket in fp-datastore

Parameters:
  • name (str) – Name of the bucket for new object.

  • object_name (str) – Name of the new object.

  • object_source – Name of the object to be copied.

:type object_source str :param **kwargs - remaining keyword arguments are passed to self.client.copy_object

fget(object_name, file_path, **kwargs)

Get an object from fp-datastore’bucket to local path

Parameters:
  • object_name (str) – Name of the object to get.

  • file_path (str) – Path on the local filesystem to which the object data will be written.

:param **kwargs - remaining keyword arguments are passed to self.client.fget_object :return: Object stat info from the server :rtype: Object

fput(object_name, file_path, **kwargs)

Put an file to fp-datastore’s bucket

Parameters:
  • object_name (str) – Name of the object. to upload

  • file_path (str) – Path on the local filesystem from which object data will be read.

:param **kwargs - remaining keyword arguments are passed to self.client.fput_object :return: Object etag computed by the server :rtype: str

get(file_name, **kwargs)

Get and object from fp-datastore’bucket from a filename

Parameters:

file_name (str) – File name from bucket you want to get

:param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Response from fp-datastore request :rtype: urllib3.response.HTTPResponse

Example:
>>> bucket.get('folder/file_name')
>>> data = bucket.get('file_name')
>>> for d in data.stream(32*1024):
>>>    sdk.log("info", d)
get_content(file_name, **kwargs)

Get the content from fp-datastore’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Content of the file :rtype: str

list(metadata=False, recursive=True, prefix=None, **kwargs)

List files from fp-datastore’s bucket :param metadata - Get metadata for all files listed, defaults to True :type metadata: bool, optional :param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [Object]

list_filename(return_type='array', contains='', recursive=True, prefix=None, **kwargs)

List files’ name from fp-datastore’s bucket

Parameters:

return_type (str, optional) – Type of return you want, defaults to ‘array’

:param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param contains: Filter file list with file_name containing the value, defaults to ‘’ :type contains: str, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [array, str]

message_from_error(func, error)
put(object_name, data, length=None, **kwargs)

Put an object to fp-datastore’s bucket

Parameters:
  • object_name (str) – Name of the object to upload

  • data (io.RawIOBase) – Data to upload to S3

  • length (int) – Data length

:param **kwargs - remaining keyword arguments are passed to self.client.put_object :return: Object etag computed by the server :rtype: str

put_request(url, path='', data=None, method='GET', headers={})

Get an object from an http request and upload it to fp-datastore’s bucket

Parameters:
  • url (str) – Url to get the file from

  • path (str, optional) – Where to upload the file in fp-datastore’s bucket, defaults to ‘’

  • data – (optional) Dictionary, list of tuples, bytes, or file-like

object to send in the body of the request. :param method: Method used by request to get file, defaults to ‘GET’ :type method: str, optional :param headers: Headers send in request, defaults to {} :type headers: dict, optional

class forepaas.dwh.S3Store(params)

Bases: Datastore

DATASTORE is an object to interact with ForePaaS Datastore (fp-datastore) Example:

data_store = DATASTORE(config) bucket = data_store.get_bucket(BUCKET_NAME)

Parameters:
  • params – dict - params to connect to fp-datastore

  • example (params["secure"] - datastore is ssl secure,) – localhost:9000

  • example – ROOT

  • example – ROOT

  • example – True

Attributes:

s3 S3 instance to handle request to fp-datastore

bucket_exists(name)

Find out if a bucket exists or not :param name: string - Bucket name :return: bool - Success of operation

create_bucket(name)

Method to add a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation

get_bucket(name)

Method to get a bucket instance from name :param name: string - Bucket name :return: Bucket - Bucket instance to handle files

get_buckets()

Method that gets all buckets from fp-datastore :return: list - containing the instance of Bucket

get_hadoop_config()

Hadoop config for PySpark

Returns:

Hadoop config

Return type:

dict

get_spark_session_config()

Spark session config for PySpark

Returns:

Spark session config

Return type:

dict

list(return_type='array')

Method to list buckets from fp-datastore :param return_type: string - Determine the type you want to get “array” or “str” :return: str or array - Contains information about buckets on fp-datastore

message_from_error(func, error)
remove_bucket(name)

Method to delete a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation

remove_bucket_not_empty(name)

Method to delete a not empty bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation

spark_prefix(bucket)
class forepaas.dwh.Sources

Bases: object

list_database_tables(name)

List tables of a database

Parameters:

name (str) – name of the database

Returns:

list of tables

Return type:

list

update(name, data, secret=False)

Update a source

Parameters:
  • name (str) – name of the source

  • data (dict) – data of the source

  • secret (bool, optional) – if True, the data will be encrypted, defaults to False

Returns:

updated source

Return type:

dict

class forepaas.dwh.VirtualAttributes

Bases: object

list()

Method that returns the list of virtual attributes

Returns:

list - list of virtual attributes

forepaas.dwh.bulk_insert(connector, table, data, source_default_schema={}, batch_size=None, **kwargs)

Insert dataframe in a physical table using class (connector), chunk the data into little batches

Parameters:
  • connector (forepaas.dwh.Connector) – connector class from connect to use for the insertion

  • table (str) – name of the destination table where the data is going to be loaded in

  • data (dataframe) – dataframe of data to insert

  • source_default_schema (dict, optional) – source schema, defaults to {}

  • batch_size (int, optional) – insert batch size, defaults to return of automatic_batch_size

Returns:

tuple of the statistics of the insertion

Return type:

Tuple(stats, error)

forepaas.dwh.connect(connection_str, account_key=None, **kwargs)

Connect to a connector class and return the instanciated class

Parameters:
  • connection_string (str) – String of the adress of the connector, protocol or datastore (example : dwh/OBJECT_NAME or dwh/DB_NAME/TABLE_NAME)

  • account_key (str) – account key

Returns:

Instance of a connector, protocol, datastore

Return type:

forepaas.connector.Connector / forepaas.protocol.Protocol forepaas.datastore.Datastore / forepaas.datastore.Bucket

forepaas.dwh.extract(connector, params={}, apply_rules=False)

Extract an iterator of dataframe from the connector

Parameters:
  • connector (forepaas.dwh.Connector) – Connector return by the usage of connect()

  • apply_rules (bool) – Apply rules from DWH, defaults to False

Params params:

Params send to extract_dataframe

Raises:

Exception – Wrong connector usage

forepaas.dwh.get_raw(protocol)

Extract raw file from a protocol and write it in datalake

Parameters:

protocol (forepaas.dwh.Protocol) – Protocol to use for the extraction

Returns:

temporary protocol string, or False if something went wrong

Return type:

string / bool

forepaas.dwh.update_metas(params=None)

Update metas in the dataplant DWH

Parameters:

params (dict) – Action params configured in the operator

Returns:

The update_metas extracted to be sent to the status

forepaas.dwh.update_table_metas(db_name, table_name, params=None)

Update metas for a specific table in the dataplant DWH

Parameters:

params (dict) – Action params configured in the operator

Returns:

The update_metas extracted to be sent to the status