Table of contents

Write to data sources

The following Python 2.7 examples write Pandas dataframes to data sources from Jupyter notebook.

To first load data from the data sources, see Add data sources and remote data sets or Access data in relational databases.

Restriction: Writing directly to relational databases requires some Python modules and other vendor specific libraries. For IBM relational databases, install ibm_db_sa if it is not available already in your notebook:
!pip install --target
/user-home/_global_/python-2.7 -U  ibm_db_sa

Python example for Db2 and BIGSQL

import dsx_core_utils, os, io
import pandas as pd
from sqlalchemy import create_engine

#Read csv to pandas
df_data_1 = pd.read_csv('../datasets/userdatap1.csv')

dataSet = dsx_core_utils.get_remote_data_set_info('db2Set')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])
#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' +
dataSource['password'] + "@9.30.57.224:50000/SAMPLE"

#Pandas does not support many databases so we use recommended
sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#Write to database
df_data_1.to_sql(dataSet['table'], engine,
schema=dataSet['schema'], if_exists='replace')

#Read back again to confirm it has been written
query = 'select * from ' + dataSet['schema'] + '.' +
dataSet['table']
df_data_1 = pd.read_sql(query, con=conn)
df_data_1.head()

where db2Set is the name of the data set, 9.30.57.224 is the sample IP of a DB2 database server, SAMPLE is the example database name and 50000 is the db2 nonssl port number, 32051 is usually the BIGSQL port number (you can replace 50000 with it for BIGSQL), and ../datasets/userdatap1.csv is a sample csv used to create a data frame which this written to a table.

Python example for Db2 and Db2 Warehouse on Cloud (previously known as dashDB)

import dsx_core_utils, os, io
import pandas as pd
import ibm_db_sa
from sqlalchemy import create_engine
import sqlalchemy

#Read csv to pandas
df1 = pd.read_csv('../datasets/userdatap1.csv')

dataSet = dsx_core_utils.get_remote_data_set_info('dashRDS')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])

#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' +
dataSource['password'] +
"@dashdb-entry-yp-dal09-07.services.dal.bluemix.net:50000/BLUDB"

#Pandas does not support many databases so we use recommended
sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#dashDB map types
from sqlalchemy.types import String, Boolean
object_cols = df1.dtypes[df1.dtypes=='object'].index
bool_cols = df1.dtypes[df1.dtypes=='bool'].index
for col in bool_cols:
    df1[col] = df1[col].astype(int)
dashdb_typemap = {col : String(4000) for col in object_cols }

#Write to database
df1.to_sql(name=dataSet['table'], schema=dataSet['schema'],
con=engine, if_exists='replace', dtype=dashdb_typemap,index=False)
#Read back again to confirm data has been written
query = 'select * from ' + dataSet['schema'] + '.' + tablename
df2 = pd.read_sql(query, con=conn)
df2.head()

where dashRDS is the name of the data set, dashdb-entry-yp-dal09-07.services.dal.bluemix.net is the sample hostname/IP of a DB2 warehouse database server, BLUDB is the example database name and 50000 is the db2 nonssl port number. ../datasets/userdatap1.csv is a sample csv used to create a data frame which this written to a table.

Python example for Informix

import dsx_core_utils, jaydebeapi, os, io
import pandas as pd

#Read csv to pandas
df2 = pd.DataFrame(raw_data2, columns = ['I', 'I2'])

dataSet = dsx_core_utils.get_remote_data_set_info('info')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])

conn = jaydebeapi.connect(dataSource['driver_class'],
[dataSource['URL'], dataSource['user'], dataSource['password']])

#Write to informix
tablename=dataSet['table']
# Create table
#tablename='sampleTable'
#query = 'create table ' + tablename+'(i int, i2 int)'
#curs = conn.cursor()
#curs.execute(query)

# Insert the dataframe rows
for row in df2.itertuples():
    srows = str(row[1:]).strip("()")
    query2 = 'insert into ' + tablename + ' values('+srows+')'
    curs.execute(query2)

query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)

where info is the name of the data set and sampleTable is the name of the table you would like to create.

Python example for Netezza

import dsx_core_utils, jaydebeapi, os, io
import pandas as pd

# Read data sources
dataSet = dsx_core_utils.get_remote_data_set_info('netz')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])

conn = jaydebeapi.connect(dataSource['driver_class'],
[dataSource['URL'], dataSource['user'], dataSource['password']])

# Write to Netezza
tablename=dataSet['table']
# Create table if does not exist
#tablename='sampleTable'
#query = 'create table ' + dataSet['schema'] + '.' + tablename +'(i
int)'
#curs = conn.cursor()
#curs.execute(query)

# Insert the dataframe rows
for row in df2.itertuples():
    srows = str(row[1:]).strip("()")
    query2 = 'insert into ' + tablename + ' values('+srows+')'
    curs.execute(query2)

query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)

where netz is the name of the data set and sampleTable is the table you are writing to.

Python example for Oracle

Restriction: Oracle requires client libraries for the write to work. To install the library:
kubectl cp -n dsxuser-999
oracle-instantclient12.2-basic-12.2.0.1.0-1.x86_64.rpm <user
jupyter pod>:/
rpm -ivh oracle-instantclient12.2-basic-12.2.0.1.0-1.x86_64.rpm
sudo sh -c "echo /opt/oracle/instantclient_12_2 >
/etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export
LD_LIBRARY_PATH=/opt/oracle/instantclient_12_2:$LD_LIBRARY_PATH

Then in the notebook, set the environment variables:

os.environ["PATH"]=os.environ["PATH"]+
":/usr/lib/oracle/12.2/client64/lib"
os.environ["LD_LIBRARY_PATH"]=os.environ["LD_LIBRARY_PATH"]+
":/usr/lib/oracle/12.2/client64/lib"
os.environ["ORACLE_HOME"]="/usr/lib/oracle/12.2/client64"

To write a pandas dataframe to the Oracle database:

#Oracle
import dsx_core_utils, os, io
import pandas as pd
from sqlalchemy import create_engine

#Read csv to pandas
df_data_1 = pd.read_csv('../datasets/CUST_HISTORY.csv')
df_data_1.head(5)

dataSet = dsx_core_utils.get_remote_data_set_info('oracle-rds')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])

sqla_url= "oracle+cx_oracle://" + dataSource['user']+ ':' +
dataSource['password'] + "@9.87.654.321:1521/xe"

#We use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#Write to database (make sure the database user has write access
and where table does not exist - has drop/create)
#replace, drops and recreates the table if it does not exist.
append, appends and creates a table if it does not exist.
df_data_1.to_sql(dataSet['table'], engine,
schema=dataSet['schema'], if_exists='replace')

#Read back again to confirm it has been written
query = 'select * from '+dataSet['schema']+'.'+dataSet['table']
df_data_1 = pd.read_sql(query, con=conn)
df_data_1.head()

where oracle-rds is the name of the data set, 9.87.654.321 is an example IP of the Oracle database host, and xe is the sample sid of the database.

Python example for HDFS

import dsx_core_utils, requests, os, io
import pandas as pd
from sqlalchemy import create_engine

dataSet = dsx_core_utils.get_remote_data_set_info('hdfsset1')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])

filePath='../datasets/userdatap1.csv'
destFilePath='/user/user1/userdatap2.csv'
url = (dataSource['URL'][:-1] if (dataSource['URL'].endswith('/'))
else dataSource['URL']) + destFilePath + "?op=CREATE"
headers = {"Authorization": os.environ.get('DSX_TOKEN')}
response = requests.request("PUT", url, headers=headers,
timeout=10, verify=False, allow_redirects=True, data=open(filePath,
'rb').read())

where hdfsset1 is the name of the data set, /user/user1/userdatap2.csv is the destination CSV file name on HDFS, timeout=10 is the time to wait for the write to complete, and ../datasets/userdatap1.csv is the source CSV file on Watson Studio Local. See webHDFS documentation for more details on file overwrite.

Python example for a custom JDBC data source

If you have a specific database vendor you have added support for then, you can write to it using the following example.

import dsx_core_utils, jaydebeapi, os, io
import pandas as pd

#Read csv to pandas
#df2 = pd.DataFrame(raw_data2, columns = ['I', 'I2'])

dataSet = dsx_core_utils.get_remote_data_set_info('custom-dbset')
dataSource =
dsx_core_utils.get_data_source_info(dataSet['datasource'])

conn = jaydebeapi.connect(dataSource['driver_class'],
[dataSource['URL'], dataSource['user'], dataSource['password']])

#Write to custom database
tablename=dataSet['table']
# Create table
#tablename='sampleTable'
#query = 'create table ' + tablename+'(i int, i2 int)'
#curs = conn.cursor()
#curs.execute(query)

# Insert the dataframe rows
for row in df2.itertuples():
    srows = str(row[1:]).strip("()")
    query2 = 'insert into ' + tablename + ' values('+srows+')'
    curs.execute(query2)

query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)

where custom-dbset is the name of the data set and sampleTable is the name of the table you would like to create.

Python example for writing Pandas dataframes over SSL

The following Python 2.7 example writes over SSL where dashsslset is the name of the remote data set. This code appends ;Security=ssl; to the JDBC URL.

Requirement: You must upgrade the ibm-db Python package to 2.0.8: (!pip install --user ibm-db==2.0.9 --upgrade). Also the SSL certificate must already be imported.
import dsx_core_utils, os, io
import pandas as pd
import ibm_db_sa
from sqlalchemy import create_engine
import sqlalchemy

dataSet = dsx_core_utils.get_remote_data_set_info('dashsslset')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
dbServer = "@" + dataSource['URL'].split('/')[2] + '/' + dataSource['URL'].split('/')[3].split(':')[0]

#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' + dataSource['password'] + dbServer + ";Security=ssl;"
print sqla_url
#Pandas does not support many databases so we use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#dashDB map types
from sqlalchemy.types import String, Boolean
object_cols = df3.dtypes[df3.dtypes=='object'].index
bool_cols = df3.dtypes[df3.dtypes=='bool'].index
for col in bool_cols:
    df3[col] = df3[col].astype(int)
dashdb_typemap = {col : String(4000) for col in object_cols }

#Write to database
df3.to_sql(name=dataSet['table'], schema=dataSet['schema'], con=engine, if_exists='append', dtype=dashdb_typemap,index=False)
#Read back again to confirm data has been written
query = 'select * from ' + dataSet['schema'] + '.' + dataSet['table']
df4 = pd.read_sql(query, con=conn)
df4.head()

Python example for Db2 Warehouse on Cloud (previously known as dashDB) over SSL

The following example code writes spark dataframes over a remote source connection to Db2 Warehouse on Cloud using an SSL URL without a toPandas() conversion. It must run in Spark 2.2.0 or later (Jupyter notebook for Python 3.5 uses spark 2.2.1). Insert your own df and dataset name in the beginning of the example, and customize the rest if your Db2 Warehouse on CLoud is case sensitive. The example code assumes your table name and schema are upper-case, and any new table generated by this code also makes column names upper-case.

import dsx_core_utils, requests, os, io
from pyspark.sql import SparkSession
import dsx_core_utils, re, jaydebeapi
# create spark context
spark = SparkSession.builder.getOrCreate()

outputDF = <your spark df>
dataset_name ="<your remote dataset name>"

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

vector_cols = (c[0] for c in outputDF.dtypes if 'vector' in c[1] or 'array' in c[1])
# convert vector columns of the form [num1, num2, num3] to a string so can be saved in dashdb column types
# customize the converstion for your own complex object type in your spark frame
vector_cols_list = list(vector_cols)
if len(vector_cols_list) > 0 :
    print ("Converting Vector type to String type. The following columns are converted")
    convert_udf = udf(lambda x: ','.join(map(str,x)), StringType())
    for a_vector_col in vector_cols_list:
        print (a_vector_col)
        outputDF = outputDF.withColumn(a_vector_col, convert_udf(col(a_vector_col)).cast(StringType()))

# normalize_col_names in case a valid column name is not provided. customize your own column naming convention
for col in outputDF.schema.names:
    if col[0].isdigit() or col[0] == '_':
        outputDF = outputDF.withColumnRenamed(col, "p_"+ col)

# the schema should now be be ready for dashdb, print to user
print ("\nShowing Schema and Sample Data After Schema Modification for Importing to DashDB:")
outputDF.printSchema()
outputDF.show(1)

print ("Writing Spark Frame to DashDB Now:")
def get_column_names(dataframe):
    cols=""
    # customize your own varchar maxSize if this is not big enough
    maxSize = 256
    for name, dtype in dataframe.dtypes:
        if dtype == "string":
            cols = cols + name.upper() + " " + "VARCHAR(" + str(maxSize) + "), "
        else:
            cols = cols + name.upper() + " " + dtype.upper() + ", "
    cols = cols.rstrip(', ')
    return cols
def extract_dns_from_url(db_url):
    url_chunks = re.split('//|@', db_url)
    return url_chunks[1] if len(url_chunks) > 1 else 'null'

def get_connection_definition_by_dataset(dataset_name):
    # extract data source info from dataset_name
    dataset_info = dsx_core_utils.get_remote_data_set_info(dataset_name)
    data_source_info = dsx_core_utils.get_data_source_info(dataset_info["datasource"])

    # optionally replace dataset_info schema and table from env var
    schema_table = os.getenv("DEF_DSX_DATASOURCE_OUTPUT_SCHEMA_TABLE", None)
    if schema_table:
        schema_table = schema_table.split('.')
        if len(schema_table) == 2:
            dataset_info['schema'] = schema_table[0]
        dataset_info['table'] = schema_table[-1]

    return dataset_info, data_source_info

def write_to_data_source(dataframe, dataset_name):
    dataset_info, data_source_info = get_connection_definition_by_dataset(dataset_name)

    url = "jdbc:db2://" + extract_dns_from_url(data_source_info['URL'])
    
    user = data_source_info['user']
    password = data_source_info['password']

    conn_properties = {"user":user,"password":password}
    schema = dataset_info['schema'].strip().upper()
    table_name = dataset_info['table'].upper()
    table_fullname = (schema + "." + table_name) if (len(schema) != 0) else table_name
    
    cols=get_column_names(dataframe)
    print ("Writing to Table: " + table_fullname + " (" + cols + ")")

    try:
        dataframe.write.option("createTableColumnTypes", cols).jdbc(url=url, table=table_fullname, properties = conn_properties)
        print ("Create Table and Write ... Done")
    except: 
        dataframe.write.jdbc(url, table=table_fullname, properties = conn_properties, mode="append")
        print ("Write ... Done")

write_to_data_source(outputDF, dataset_name)