Data Manager
Examples
Datastore connect
from forepaas.dwh import connect
data_store = connect("data_store")
# Get bucket and upload image from url to path forepaas/test.jpg.
# And finally get the image from the bucket
bucket_test = data_store.get_bucket("test")
lists = bucket_test.list(recursive=True)
bucket_test.put_request("https://i.stack.imgur.com/r8jTK.jpg", path="forepaas/test.jpg")
data = lists.get("hello/test.jpg")
# Create a bucket if it does not already exists
if data_store.bucket_exists("test-exists") is False:
data_store.create_bucket("test-exists")
# Connect directly to the bucket test and remove the file
bucket_test2 = connect("data_store/test")
bucket_test2.delete("hello/test.jpg")
Connectors connect
import logging
from forepaas.dwh import connect
logger = logging.getLogger(__name__)
# connect to the source connector
source = connect("dwh/data_prim/chicago_calendar")
# extract dataframe from source and bulk insert into the connector
extract_parameters = {"condition": "week_day = 1"}
df = source.extract(extract_parameters)
logger.info(f"There are {len(df)} rows with week_day = 1 in the table chicago_calendar")
Protocols connect
import logging
from forepaas.dwh import connect
from forepaas.core.settings import CONFIG
import pandas as pd
logger = logging.getLogger(__name__)
# connect to the protocol connector and download the file
# It will download the file in the worker file directory (CONFIG["file_directory"])
filename = "chicago_calendar.csv"
protocol = connect(f"dwh/chicago_files/{filename}")
protocol.get()
df = pd.read_csv(f"{CONFIG['file_directory']}/{filename}", sep=";")
column_names = ", ".join(list(df.columns))
logger.info(f"File {filename} has columns {column_names} and it has {len(df)} rows")
Get raw
from forepaas.worker.protocol import get_raw
from forepaas.worker.connect import connect
# You will need a protocol instance to extract from
protocol = connect("dwh/chicago_files/chicago_calendar.csv")
source = get_raw(protocol)
API Reference
- class forepaas.dwh.AzureBlobStorage(params)
Bases:
DatastoreDATASTORE is an object to interact with ForePaaS Datastore (fp-datastore) Example:
data_store = DATASTORE(config) bucket = data_store.get_bucket(BUCKET_NAME)
- Parameters:
params – dict - params to connect to fp-datastore
example (params["secure"] - datastore is ssl secure,) – localhost:9000
example – ROOT
example – ROOT
example – True
- Attributes:
azure AzureBlobStorage instance to handle request to fp-datastore
- bucket_exists(name)
Find out if a bucket exists or not :param name: string - Bucket name :return: bool - Success of operation
- create_bucket(name)
Method to add a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation
- get_bucket(name)
Method to get a bucket instance from name :param name: string - Bucket name :return: Bucket - Bucket instance to handle files
- get_buckets()
Method that gets all buckets from fp-datastore :return: list - containing the instance of Bucket
- get_hadoop_config()
Hadoop config for PySpark
- Returns:
Hadoop config
- Return type:
dict
- get_spark_session_config()
Spark session config for PySpark
- Returns:
Spark session config
- Return type:
dict
- list(return_type='array')
Method to list buckets from fp-datastore :param return_type: string - Determine the type you want to get “array” or “str” :return: str or array - Contains information about buckets on fp-datastore
- message_from_error(func, error)
- remove_bucket(name)
Method to delete a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation
- remove_bucket_not_empty(name)
Method to delete a not empty bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation
- spark_prefix(bucket)
- class forepaas.dwh.AzureBucket(client, name)
Bases:
objectBucket is an object to interact with a specific Bucket from ForePaaS Datastore (fp-datastore) Example:
client = azure.storage.blob.BlobServiceClient(connection_string) BUCKET_NAME = “example_bucket” bucket = Bucket(client, BUCKET_NAME)
- Parameters:
:param Azure - Instance of azure.storage.blob.BlobServiceClient :param name - Name of the bucket
- Attributes:
azure: Azure - Instance of azure.storage.blob.BlobServiceClient name - Name of the bucket
- delete(object_name)
Delete multiple/single file in fp-datastore’s bucket
- Parameters:
object_name (str or array) – Name of the object(s) to delete
- fcopy_to(name, object_name, object_source, **kwargs)
Copy file from bucket to a new bucket in fp-datastore
- Parameters:
name (str) – Name of the bucket for new object.
object_name (str) – Name of the new object.
object_source – Name of the object to be copied.
:type object_source str :param **kwargs - remaining keyword arguments are passed to self.client.copy_object
- fget(object_name, file_path, **kwargs)
Get an object from fp-datastore’bucket to local path
- Parameters:
object_name (str) – Name of the object to get.
file_path (str) – Path on the local filesystem to which the object data will be written.
:param **kwargs - remaining keyword arguments are passed to self.client.fget_object :return: Object stat info from the server :rtype: Object
- fput(object_name, file_path, **kwargs)
Put an file to fp-datastore’s bucket
- Parameters:
object_name (str) – Name of the object. to upload
file_path (str) – Path on the local filesystem from which object data will be read.
:param **kwargs - remaining keyword arguments are passed to self.client.fput_object :return: Object etag computed by the server :rtype: str
- get(file_name, **kwargs)
Get and object from fp-datastore’bucket from a filename
- Parameters:
file_name (str) – File name from bucket you want to get
:param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Response from fp-datastore request :rtype: urllib3.response.HTTPResponse
- Example:
>>> bucket.get('folder/file_name') >>> data = bucket.get('file_name') >>> for d in data.stream(32*1024): >>> sdk.log("info", d)
- get_content(file_name, **kwargs)
Get the content from fp-datastore’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Content of the file :rtype: str
- list(metadata=False, recursive=True, **kwargs)
List files from fp-datastore’s bucket :param metadata - Get metadata for all files listed, defaults to True :type metadata: bool, optional :param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [Object]
- list_filename(return_type='array', contains='', recursive=True, **kwargs)
List files’ name from fp-datastore’s bucket
- Parameters:
return_type (str, optional) – Type of return you want, defaults to ‘array’
:param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param contains: Filter file list with file_name containing the value, defaults to ‘’ :type contains: str, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [array, str]
- listen_notification(path=None, suffix=None, events=None)
Listen for notifications on a bucket. Additionally one can provide filters for prefix, suffix and events. There is no prior set bucket notification needed to use this API.
- Parameters:
path (str) – Object key prefix to filter notifications for
suffix (str) – Object key suffix to filter notifications for
events (list) – Enable notifications for specific event types
- Returns:
Iterator of azure blob storage event
- Return type:
iterator
- message_from_error(func, error)
- put(object_name, data, length=None, **kwargs)
Put an object to fp-datastore’s bucket
- Parameters:
object_name (str) – Name of the object to upload
data (io.RawIOBase) – Data to upload to azure blob storage
length (int) – Data length
:param **kwargs - remaining keyword arguments are passed to self.client.put_object :return: Object etag computed by the server :rtype: str
- put_request(url, path='', data=None, method='GET', headers=None)
Get an object from an http request and upload it to fp-datastore’s bucket
- Parameters:
url (str) – Url to get the file from
path (str, optional) – Where to upload the file in fp-datastore’s bucket, defaults to ‘’
data – (optional) Dictionary, list of tuples, bytes, or file-like
object to send in the body of the request. :param method: Method used by request to get file, defaults to ‘GET’ :type method: str, optional :param headers: Headers send in request, defaults to {} :type headers: dict, optional
- class forepaas.dwh.Bucket(client, name, bucket_type='s3')
Bases:
objectBucket is an object to interact with a specific Bucket from any ForePaas compatible Datastore
- Parameters:
client – Instance of client
name – Name of the bucket
- Example:
>>> client = boto3.client('localhost:9000') >>> BUCKET_NAME = "example_bucket" >>> bucket = Bucket(client, BUCKET_NAME)
- delete(object_name)
Delete multiple/single file in store’s bucket
- Parameters:
object_name (str or array) – Name of the object(s) to delete
- fcopy_to(bucket_name, object_name, object_source, **kwargs)
Copy file from bucket to a new bucket in fp-datastore
- Parameters:
bucket_name (str) – Name of the bucket for new object.
object_name (str) – Name of the new object.
object_source (str) – Name of the object to be copied.
**kwargs – remaining keyword arguments are passed to provider copy objects function
- fget(object_name, file_path, **kwargs)
Get an object from store’bucket to local path
- Parameters:
object_name (str) – Name of the object to get.
file_path (str) – Path on the local filesystem to which the object data will be written.
**kwargs – remaining keyword arguments are passed to provider fget function
- Returns:
Object stat info from the server
- Return type:
Object
- fput(object_name, file_path, **kwargs)
Put an file to store’s bucket
- Parameters:
object_name (str) – Name of the object. to upload
file_path (str) – Path on the local filesystem from which object data will be read.
**kwargs – remaining keyword arguments are passed to provider fput function
- Returns:
Object etag computed by the server
- Return type:
str
- get(file_name, **kwargs)
Get and object from store’bucket from a filename
- Parameters:
file_name (str) – File name from bucket you want to get
**kwargs – remaining keyword arguments are passed to provider get object function
- Returns:
Content of the file
- Return type:
Streamer
- Example:
bucket.get(‘folder/file_name’) data = bucket.get(‘file_name’)
- get_content(file_name, **kwargs)
Get the content from store’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs: remaining keyword arguments are passed to provider get object function :return: Content of the file :rtype: str
- list(metadata=True, recursive=True, **kwargs)
List files from store’s bucket
- Parameters:
metadata (bool, optional) – Get metadata for all files listed, defaults to True
**kwargs – remaining keyword arguments are passed to provider function to list files
- Raises:
Exception – [description]
- Returns:
List of bucket files
- Return type:
[Object]
- list_filename(return_type='array', contains='', recursive=True, **kwargs)
List files’ name from storage’s bucket
- Parameters:
return_type (str, optional) – Type of return you want, defaults to ‘array’
contains (str, optional) – Filter file list with file_name containing the value, defaults to ‘’
**kwargs – remaining keyword arguments are passed to provider function to list files
- Raises:
Exception – [description]
- Returns:
List of bucket files
- Return type:
[array, str]
- listen_notification(path=None, suffix=None, events=None)
Listen for notifications on a bucket. Additionally one can provide filters for prefix, suffix and events. There is no prior set bucket notification needed to use this API.
- Parameters:
path (str) – Object key prefix to filter notifications for
suffix (str) – Object key suffix to filter notifications for
events (list) – Enable notifications for specific event types
- Returns:
Iterator of event
- Return type:
iterator
- put(object_name, data, length=None, **kwargs)
Put an object to store’s bucket
- Parameters:
object_name (str) – Name of the object to upload
data (str) – Data to upload to data store
length (int) – Data length
**kwargs – remaining keyword arguments are passed to provider put object function
- Returns:
Object etag computed by the server
- Return type:
str
- put_request(url, path='', data=None, method='GET', headers=None)
Get an object from an http request and upload it to store’s bucket
- Parameters:
url (str) – Url to get the file from
path (str, optional) – Where to upload the file in Google store’s bucket, defaults to ‘’
data – (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the request.
method (str, optional) – Method used by request to get file, defaults to ‘GET’
headers (dict, optional) – Headers send in request, defaults to {}
- stat(object_name, **kwargs)
Get stat for a file in fp-datastore’s bucket
- Parameters:
object_name (str or array) – Name of the object to get stat
**kwargs – remaining keyword arguments are passed to provider object’s stat function
- exception forepaas.dwh.BucketNotFound
Bases:
ExceptionBucket does not exists in the data_store
- exception forepaas.dwh.BucketUnauthorized
Bases:
ExceptionBucket is from the dataplant system and should not be accessed
- class forepaas.dwh.Connector(configuration, params)
Bases:
objectClass to handle Connectors
- Parameters:
configuration (dict) – Configuration sent by the operator containing dwh information about sources and tables
params (dict) – Action params configured in the control plane
- configure(*args, **kwargs)
- delete(*args, **kwargs)
- escape_single_quote(str)
Escape single quote :param str: string to escape :return: escaped string
- extract(*args, **kwargs)
- get_endpoint_parameter(key, default_value=None)
Search for parameter by key. First of all search in params from job, after that search in table_dwh
- Parameters:
key (str) – Key to search for
default_value (mixed, optional) – Defautl value if the key is not found, defaults to None
- Returns:
Value found
- Return type:
mixed
- get_error(class_name, func, err, _file)
- get_limit(limit=-1)
Get the limit of the connector :param limit: limit of the connector :return: limit of the connector
- get_parameter(key, default_value=None)
Search for parameter by key. First of all search in params from job, after that search in the configuration[“parameters”] and configuration. If the key is still not found try to found it in table_dwh
- Parameters:
key (str) – Key to search for
default_value (mixed, optional) – Defautl value if the key is not found, defaults to None
- Returns:
Value found
- Return type:
mixed
- get_perimeter_query(params)
Get the perimeter query :param params: params of the action :return: perimeter query
- get_return_type(return_type='')
Get the return type of the connector :param return_type: return type of the connector :return: return type of the connector
- get_segmentation_query(params)
Get the segmentation query :param params: params of the action :return: segmentation query
- get_source_default_schema()
Get source default schema
- Returns:
source default schema
- Return type:
dict
- get_table_parameter(key, default_value=None)
Search for parameter by key in table_dwh or inside table_dwh parameters
- Parameters:
key (str) – Key to search for
default_value (mixed, optional) – Defautl value if the key is not found, defaults to None
- Returns:
Value found
- Return type:
mixed
- handle_api_extraction(lines, limit=-1, return_type='dict')
THis function should be return at the end of an extract of an api type connector like Facebook, GA, Youtube… It will add a log to the account in the Data Manager and then return the lines depending of the limit parameter
- Parameters:
lines (list(dict)) – Extracted lines from api
- Returns:
Extracted lines limited or not
- Return type:
list(dict)
- handle_query_cursor(cursor, return_type='')
Handle query cursor :param cursor: cursor of the query :param return_type: return type of the connector :return: query result
- insert(*args, **kwargs)
- linearize(data, params)
Method to used to linearize data from objects that might contain each multiple lines :params data: data to linearized :type data: dict or list :return: list of data linearized :rtype: list
- linearize_no_main_node(data, params)
Method to used to linearize data from objects that might contain each multiple lines :params data: data to linearized :type data: dict or list :return: list of data linearized :rtype: list
- load(*args, **kwargs)
- merge_conditions(condition, cond)
- select(*args, **kwargs)
- update(*args, **kwargs)
- class forepaas.dwh.DM
Bases:
objectClass to handle Data Manager Operation
- Parameters:
conf (dict) – Configuration sent by the operator containing dwh information about sources and tables
params (dict) – Action params configured in the control plane
- add_account_logstatus(status, code, message='', account_key=None)
Add logstatus by account
- Parameters:
status (str) – Database account status
code (str) – Status code
message (str, optional) – Message
account_key (str, optional) – Account key
- get_account_key_conf(connection_str, account_key)
get account key conf
- Parameters:
connection_str (str) – connection string
account_key (str) – account key
- Raises:
Exception – dbname not found in configuration: …
Exception – no accounts found in configuration: …
Exception – account_key: … not found in configuration: …
- Returns:
account key conf
- Return type:
dict
- get_db_account_keys(connection_str)
Get db account keys
- Parameters:
connection_str (str) – connection string
- Returns:
db account_keys
- Return type:
list
- get_source_default_schema(dwh_db=None)
Get source default schema
- Parameters:
dwh_db (dict, optional) – DWH databases, defaults to None
- Returns:
source default schema
- Return type:
dict
- push_account_logstatus(db_name)
- class forepaas.dwh.DataTransformer
Bases:
objectUtility class used to apply rules to the source data
- Class variable rule_to_function:
dict - mapping the rule to corresponding function
- Class variable condition_to_operator:
dict - mapping the condition to the corresponding operator
- apply_rules(rules, data, error_rules)
Method that filters the dataframe using rules conditions
- Parameters:
rules – list - list of rules to apply
data – dataframe - containing the data
- apply_schema(data, extractor_type, source_default_schema=None)
Method that applies the schema to the data
- Parameters:
data – dataframe - containing the data
extractor_type – str - type of the extractor
source_default_schema – dict - default schema of the source
- Returns:
dataframe - containing the data with the schema applied
- get_schema()
Method that returns the schema of the data
- Returns:
dict - schema of the data
- time_delta_to_datetime(data)
Method that transforms the timedelta columns to datetime
- Parameters:
data – dataframe - containing the data
- Returns:
dataframe - containing the data with the timedelta columns transformed to datetime
- class forepaas.dwh.Database
Bases:
object- attribute_rules_delete(attribute_id)
- create(content)
- database_tables(database, table)
- get(database)
- list()
- remove(database)
- rules_create(rule)
- table(database, table)
- table_attribute(database, table, attribute_name)
- table_attribute_delete(database, table, attribute_id)
- table_attribute_update(database, table, attribute)
- table_attributes(database, table)
- table_attributes_create(database, table, data)
- table_attributes_delete(database, table)
- table_attributes_update(database, table, data)
- table_rules_list(database, table)
- table_update(database, table, data)
- tables()
- update(database, content)
- class forepaas.dwh.Datalake
Bases:
objectClass to handle Datalake
- Parameters:
configuration (dict) – Configuration sent by the operator containing dwh information about sources and tables
params (dict) – Action params configured in the control plane
- class forepaas.dwh.Datastore(params)
Bases:
objectDATASTORE is an object to interact with ForePaaS compatible datastore (S3, GoogleStore, Azure blob storage)
- Parameters:
params – dict - params to connect to client store
- Example:
>>> data_store = DATASTORE(config) >>> bucket = data_store.get_bucket(BUCKET_NAME)
- bucket_exists(name)
Find out if a bucket exists or not
- Parameters:
name – string - Bucket name
- Returns:
bool - Success of operation
- client = None
- create_bucket(name)
Method to add a bucket in Google store
- Parameters:
name – string - Bucket name
- Returns:
bool - Success of operation
- get_bucket(name)
Method to get a bucket instance from name
- Parameters:
name – string - Bucket name
- Returns:
Bucket - Bucket instance to handle files
- get_buckets()
Method that gets all buckets from Google Store
- Returns:
list - containing the instance of Bucket
- list(return_type='array')
Method to list buckets from Google Store
- Parameters:
return_type – string - Determine the type you want to get “array” or “str”
- Returns:
str or array - Contains information about buckets on Google Store
- remove_bucket(name)
Method to delete a bucket in the Google store
- Parameters:
name – string - Bucket name
- Returns:
bool - Success of operation
- remove_bucket_not_empty(name)
Method to delete a not empty bucket in the Google Store
- Parameters:
name – string - Bucket name
- Returns:
bool - Success of operation
- exception forepaas.dwh.DwhRequestException(response)
Bases:
Exception
- class forepaas.dwh.Factory(connection_string, _type)
Bases:
objectClass to instanciate connector/protocol
- Parameters:
connection_string (str) – Connection string used to get the corresponding module adapter.connection/source
_type (str) – Type of connector
params (dict) – Action params configured in the control plane
- decrypt_secret(content)
- get_module(cls)
Import module using ClassName and ModulePath in registered modules
- Raises:
Exception – Import module error
- Returns:
Module Class
- Return type:
Class
- get_params()
Add source and original_source to params if it is not there. This is to make the protocol/connector work properly
- Returns:
Action parameters
- Return type:
dict
- instanciate(configure=True)
Match the connection_string instance_name with the imported modules Instanciate the class, configure and return it
- Returns:
Class instance from the connection string
- Return type:
Instance
- register_forepaas_modules()
Register all connectors and protocols modules inside the forepaas folder
- Returns:
Dictionary of all modules with its modulePath and className
- Return type:
dict
- class forepaas.dwh.Flashzone
Bases:
object- delete(database, table_name)
- get(database, table_name)
- put(database, table_name, data)
- class forepaas.dwh.GoogleBucket(client, name)
Bases:
objectBucket is an object to interact with a specific Bucket from ForePaaS Datastore (Google store)
- Parameters:
client – Google storage client
:param name - Name of the bucket
- Example:
>>> client = google.cloud.storage.Client() >>> BUCKET_NAME = "example_bucket" >>> bucket = Bucket(client, BUCKET_NAME)
- delete(object_name)
Delete multiple/single file in Google store’s bucket
- Parameters:
object_name (str or array) – Name of the object(s) to delete
- fcopy_to(name, object_name, object_source, **kwargs)
Copy file from bucket to a new bucket in fp-datastore
- Parameters:
name (str) – Name of the bucket for new object.
object_name (str) – Name of the new object.
object_source – Name of the object to be copied.
:type object_source str :param **kwargs - remaining keyword arguments are passed to blob.rewrite
- fget(object_name, file_path, **kwargs)
Get an object from Google store’bucket to local path
- Parameters:
object_name (str) – Name of the object to get.
file_path (str) – Path on the local filesystem to which the object data will be written.
:param **kwargs - remaining keyword arguments are passed to blob.download_to_filename :return: Object stat info from the server :rtype: Object
- fput(object_name, file_path, metadata=None, **kwargs)
Put an file to Google store’s bucket
- Parameters:
object_name (str) – Name of the file to upload
file_path (str) – Path on the local filesystem from which file will be read.
:param **kwargs - remaining keyword arguments are passed to blob.upload_from_filename :return: Object etag computed by the server :rtype: str
- get(file_name, **kwargs)
Get and object from Google store’bucket from a filename
- Parameters:
file_name (str) – File name from bucket you want to get
:param **kwargs - remaining keyword arguments are passed to blob.download_as_string :return: Content of the file :rtype: Streamer
- Example:
bucket.get(‘folder/file_name’) data = bucket.get(‘file_name’)
- get_content(file_name, **kwargs)
Get the content from Google store’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs - remaining keyword arguments are passed to blob.download_as_string :return: Content of the file :rtype: str
- list(metadata=False, **kwargs)
List files from Google store’s bucket
:param metadata - Get metadata for all files listed, defaults to True :type metadata: bool, optional :param **kwargs - remaining keyword arguments are passed to bucket.list_blobs :raises Exception: [description] :return: List of bucket files :rtype: [Object]
- list_filename(return_type='array', contains='', **kwargs)
List files’ name from google storage’s bucket
- Parameters:
return_type (str, optional) – Type of return you want, defaults to ‘array’
contains (str, optional) – Filter file list with file_name containing the value, defaults to ‘’
:param **kwargs - remaining keyword arguments are passed to self.client.list_blobs :raises Exception: [description] :return: List of bucket files :rtype: [array, str]
- listen_notification(path=None, suffix=None, events=None)
Listen for notifications on a bucket. Additionally one can provide filters for prefix, suffix and events. There is no prior set bucket notification needed to use this API.
- Parameters:
path (str) – Object key prefix to filter notifications for
suffix (str) – Object key suffix to filter notifications for
events (list) – Enable notifications for specific event types
- Returns:
Iterator of event
- Return type:
iterator
- message_from_error(func, error)
- put(object_name, data, length=None, **kwargs)
Put an object to Google store’s bucket
- Parameters:
object_name (str) – Name of the object to upload
data (str) – Data to upload to google
length (int) – Data length
:param **kwargs - remaining keyword arguments are passed to blob.upload_from_file :return: Object etag computed by the server :rtype: str
- put_request(url, path='', data=None, method='GET', headers=None)
Get an object from an http request and upload it to Google store’s bucket
- Parameters:
url (str) – Url to get the file from
path (str, optional) – Where to upload the file in Google store’s bucket, defaults to ‘’
data – (optional) Dictionary, list of tuples, bytes, or file-like
object to send in the body of the request. :param method: Method used by request to get file, defaults to ‘GET’ :type method: str, optional :param headers: Headers send in request, defaults to {} :type headers: dict, optional
- class forepaas.dwh.GoogleStore(params)
Bases:
Datastore- bucket_exists(name)
Find out if a bucket exists or not :param name: string - Bucket name :return: bool - Success of operation
- create_bucket(name)
Method to add a bucket in Google store :param name: string - Bucket name :return: bool - Success of operation
- get_bucket(name)
Method to get a bucket instance from name :param name: string - Bucket name :return: Bucket - Bucket instance to handle files
- get_buckets()
Method that gets all buckets from Google Store :return: list - containing the instance of Bucket
- get_hadoop_config()
Hadoop config for PySpark
- Returns:
Hadoop config
- Return type:
dict
- get_spark_session_config()
Spark session config for PySpark
- Returns:
Spark session config
- Return type:
dict
- list(return_type='array')
Method to list buckets from Google Store :param return_type: string - Determine the type you want to get “array” or “str” :return: str or array - Contains information about buckets on Google Store
- message_from_error(func, error)
- metadata_prefix = ''
- remove_bucket(name)
Method to delete a bucket in the Google store :param name: string - Bucket name :return: bool - Success of operation
- remove_bucket_not_empty(name)
Method to delete a not empty bucket in the Google Store :param name: string - Bucket name :return: bool - Success of operation
- spark_prefix(bucket)
- class forepaas.dwh.LogicalObject
Bases:
object- create(content)
- get(name)
- list()
- remove(name)
- update(name, content)
- update_status(status)
- class forepaas.dwh.Metas
Bases:
object- table(table_name, database=None)
Method that returns the table
- Parameters:
table_name – str - table name
- table_attribute(db_name, table_name, attribute_name)
Method that returns the attribute of the table
- Parameters:
db_name – str - database name
table_name – str - table name
attribute_name – str - attribute name
- Returns:
dict - attribute
- table_attribute_delete(db_name, table_name, attribute_name)
Method that deletes the attribute of the table
- Parameters:
db_name – str - database name
table_name – str - table name
attribute_name – str - attribute name
- Returns:
int - status code
- table_attribute_update(db_name, table_name, attribute)
Method that updates the attribute of the table
- Parameters:
db_name – str - database name
table_name – str - table name
attribute – dict - attribute data
- Returns:
int - status code
- table_attributes(db_name, table_name)
Method that returns the list of attributes of the table
- Parameters:
db_name – str - database name
table_name – str - table name
- Returns:
list - list of attributes
- table_update(table_name, data, database=None)
Method that updates the table
- Parameters:
table_name – str - table name
data – dict - table data
- Returns:
int - status code
- tables()
Method that returns the list of tables
- Returns:
list - list of tables
- tables_attributes()
Method that returns the list of attributes of all tables
- Returns:
list - list of attributes
- class forepaas.dwh.Protocol(configuration, params)
Bases:
object- configure()
- delete(**kwargs)
- get(**kwargs)
- get_error_msg(e, class_name, func, file)
- get_parameter(key, default_value=None)
Search for parameter by key. First of all search in params from job, after that search in the configuration[“parameters”] and configuration. If the key is still not found try to found it in table_dwh
- Parameters:
key (str) – Key to search for
default_value (mixed, optional) – Defautl value if the key is not found, defaults to None
- Returns:
Value found
- Return type:
mixed
- get_table_parameter(key, default_value=None)
Search for parameter by key in table_dwh or inside table_dwh parameters
- Parameters:
key (str) – Key to search for
default_value (mixed, optional) – Defautl value if the key is not found, defaults to None
- Returns:
Value found
- Return type:
mixed
- pre_get(to, filename)
- pre_list(path)
- put(**kwargs)
- class forepaas.dwh.S3Bucket(client, name)
Bases:
objectBucket is an object to interact with a specific Bucket from ForePaaS Datastore (fp-datastore) Example:
client = S3(‘play.min.io:9000’) BUCKET_NAME = “example_bucket” bucket = Bucket(client, BUCKET_NAME)
- Parameters:
- param S3:
S3 - Instance of S3Client
:param name - Name of the bucket
- Attributes:
S3: S3 - Instance of S3Client name - Name of the bucket
- delete(object_name)
Delete multiple/single file in fp-datastore’s bucket
- Parameters:
object_name (str or array) – Name of the object(s) to delete
- fcopy_to(name, object_name, object_source, **kwargs)
Copy file from bucket to a new bucket in fp-datastore
- Parameters:
name (str) – Name of the bucket for new object.
object_name (str) – Name of the new object.
object_source – Name of the object to be copied.
:type object_source str :param **kwargs - remaining keyword arguments are passed to self.client.copy_object
- fget(object_name, file_path, **kwargs)
Get an object from fp-datastore’bucket to local path
- Parameters:
object_name (str) – Name of the object to get.
file_path (str) – Path on the local filesystem to which the object data will be written.
:param **kwargs - remaining keyword arguments are passed to self.client.fget_object :return: Object stat info from the server :rtype: Object
- fput(object_name, file_path, **kwargs)
Put an file to fp-datastore’s bucket
- Parameters:
object_name (str) – Name of the object. to upload
file_path (str) – Path on the local filesystem from which object data will be read.
:param **kwargs - remaining keyword arguments are passed to self.client.fput_object :return: Object etag computed by the server :rtype: str
- get(file_name, **kwargs)
Get and object from fp-datastore’bucket from a filename
- Parameters:
file_name (str) – File name from bucket you want to get
:param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Response from fp-datastore request :rtype: urllib3.response.HTTPResponse
- Example:
>>> bucket.get('folder/file_name') >>> data = bucket.get('file_name') >>> for d in data.stream(32*1024): >>> sdk.log("info", d)
- get_content(file_name, **kwargs)
Get the content from fp-datastore’bucket filename :param file_name: File name from bucket you want to get :type file_name: str :param **kwargs - remaining keyword arguments are passed to self.client.get_object :return: Content of the file :rtype: str
- list(metadata=False, recursive=True, prefix=None, **kwargs)
List files from fp-datastore’s bucket :param metadata - Get metadata for all files listed, defaults to True :type metadata: bool, optional :param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [Object]
- list_filename(return_type='array', contains='', recursive=True, prefix=None, **kwargs)
List files’ name from fp-datastore’s bucket
- Parameters:
return_type (str, optional) – Type of return you want, defaults to ‘array’
:param recursive - determine if you want to list all folder as well, defaults to True :type recursive: bool, optional :param contains: Filter file list with file_name containing the value, defaults to ‘’ :type contains: str, optional :param **kwargs - remaining keyword arguments are passed to self.client.list_objects_v2 :raises Exception: [description] :return: List of bucket files :rtype: [array, str]
- message_from_error(func, error)
- put(object_name, data, length=None, **kwargs)
Put an object to fp-datastore’s bucket
- Parameters:
object_name (str) – Name of the object to upload
data (io.RawIOBase) – Data to upload to S3
length (int) – Data length
:param **kwargs - remaining keyword arguments are passed to self.client.put_object :return: Object etag computed by the server :rtype: str
- put_request(url, path='', data=None, method='GET', headers={})
Get an object from an http request and upload it to fp-datastore’s bucket
- Parameters:
url (str) – Url to get the file from
path (str, optional) – Where to upload the file in fp-datastore’s bucket, defaults to ‘’
data – (optional) Dictionary, list of tuples, bytes, or file-like
object to send in the body of the request. :param method: Method used by request to get file, defaults to ‘GET’ :type method: str, optional :param headers: Headers send in request, defaults to {} :type headers: dict, optional
- class forepaas.dwh.S3Store(params)
Bases:
DatastoreDATASTORE is an object to interact with ForePaaS Datastore (fp-datastore) Example:
data_store = DATASTORE(config) bucket = data_store.get_bucket(BUCKET_NAME)
- Parameters:
params – dict - params to connect to fp-datastore
example (params["secure"] - datastore is ssl secure,) – localhost:9000
example – ROOT
example – ROOT
example – True
- Attributes:
s3 S3 instance to handle request to fp-datastore
- bucket_exists(name)
Find out if a bucket exists or not :param name: string - Bucket name :return: bool - Success of operation
- create_bucket(name)
Method to add a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation
- get_bucket(name)
Method to get a bucket instance from name :param name: string - Bucket name :return: Bucket - Bucket instance to handle files
- get_buckets()
Method that gets all buckets from fp-datastore :return: list - containing the instance of Bucket
- get_hadoop_config()
Hadoop config for PySpark
- Returns:
Hadoop config
- Return type:
dict
- get_spark_session_config()
Spark session config for PySpark
- Returns:
Spark session config
- Return type:
dict
- list(return_type='array')
Method to list buckets from fp-datastore :param return_type: string - Determine the type you want to get “array” or “str” :return: str or array - Contains information about buckets on fp-datastore
- message_from_error(func, error)
- remove_bucket(name)
Method to delete a bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation
- remove_bucket_not_empty(name)
Method to delete a not empty bucket in the fp-datastore :param name: string - Bucket name :return: bool - Success of operation
- spark_prefix(bucket)
- class forepaas.dwh.Sources
Bases:
object- list_database_tables(name)
List tables of a database
- Parameters:
name (str) – name of the database
- Returns:
list of tables
- Return type:
list
- update(name, data, secret=False)
Update a source
- Parameters:
name (str) – name of the source
data (dict) – data of the source
secret (bool, optional) – if True, the data will be encrypted, defaults to False
- Returns:
updated source
- Return type:
dict
- class forepaas.dwh.VirtualAttributes
Bases:
object- list()
Method that returns the list of virtual attributes
- Returns:
list - list of virtual attributes
- forepaas.dwh.bulk_insert(connector, table, data, source_default_schema={}, batch_size=None, **kwargs)
Insert dataframe in a physical table using class (connector), chunk the data into little batches
- Parameters:
connector (forepaas.dwh.Connector) – connector class from connect to use for the insertion
table (str) – name of the destination table where the data is going to be loaded in
data (dataframe) – dataframe of data to insert
source_default_schema (dict, optional) – source schema, defaults to {}
batch_size (int, optional) – insert batch size, defaults to return of automatic_batch_size
- Returns:
tuple of the statistics of the insertion
- Return type:
Tuple(stats, error)
- forepaas.dwh.connect(connection_str, account_key=None, **kwargs)
Connect to a connector class and return the instanciated class
- Parameters:
connection_string (str) – String of the adress of the connector, protocol or datastore (example : dwh/OBJECT_NAME or dwh/DB_NAME/TABLE_NAME)
account_key (str) – account key
- Returns:
Instance of a connector, protocol, datastore
- Return type:
forepaas.connector.Connector / forepaas.protocol.Protocol forepaas.datastore.Datastore / forepaas.datastore.Bucket
- forepaas.dwh.extract(connector, params={}, apply_rules=False)
Extract an iterator of dataframe from the connector
- Parameters:
connector (forepaas.dwh.Connector) – Connector return by the usage of connect()
apply_rules (bool) – Apply rules from DWH, defaults to False
- Params params:
Params send to extract_dataframe
- Raises:
Exception – Wrong connector usage
- forepaas.dwh.get_raw(protocol)
Extract raw file from a protocol and write it in datalake
- Parameters:
protocol (forepaas.dwh.Protocol) – Protocol to use for the extraction
- Returns:
temporary protocol string, or False if something went wrong
- Return type:
string / bool
- forepaas.dwh.update_metas(params=None)
Update metas in the dataplant DWH
- Parameters:
params (dict) – Action params configured in the operator
- Returns:
The update_metas extracted to be sent to the status
- forepaas.dwh.update_table_metas(db_name, table_name, params=None)
Update metas for a specific table in the dataplant DWH
- Parameters:
params (dict) – Action params configured in the operator
- Returns:
The update_metas extracted to be sent to the status