Table of contents

Write to data sources

The following Pyton 2.7 examples write Pandas dataframes to data sources from Jupyter notebook. To first load data from the data sources, see Add data sources and remote data sets or Access data in relational databases.

Requirement: Writing directly to relational databases requires some Python modules and other vendor specific libraries. For IBM relational databases, install ibm_db_sa if it is not available already in your notebook:

!pip install --target /user-home/_global_/python-2.7 -U  ibm_db_sa

Python example for Db2 and BIGSQL

import dsx_core_utils, os, io
import pandas as pd
from sqlalchemy import create_engine

#Read csv to pandas
df_data_1 = pd.read_csv('../datasets/userdatap1.csv')

dataSet = dsx_core_utils.get_remote_data_set_info('db2Set')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' + dataSource['password'] + "@9.30.57.224:50000/SAMPLE"

#Pandas does not support many databases so we use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#Write to database
df_data_1.to_sql(dataSet['table'], engine, schema=dataSet['schema'], if_exists='replace')

#Read back again to confirm it has been written
query = 'select * from ' + dataSet['schema'] + '.' + dataSet['table']
df_data_1 = pd.read_sql(query, con=conn)
df_data_1.head()

where db2Set is the name of the data set, 9.30.57.224 is the sample IP of a DB2 database server, SAMPLE is the example database name and 50000 is the db2 nonssl port number, 32051 is usually the BIGSQL port number (you can replace 50000 with it for BIGSQL), and ../datasets/userdatap1.csv is a sample csv used to create a data frame which this written to a table.

Python example for Db2 and Db2 Warehouse on Cloud (previously known as dashDB)

import dsx_core_utils, os, io
import pandas as pd
import ibm_db_sa
from sqlalchemy import create_engine
import sqlalchemy

#Read csv to pandas
df1 = pd.read_csv('../datasets/userdatap1.csv')

dataSet = dsx_core_utils.get_remote_data_set_info('dashRDS')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])

#SQL Alchemy URL
sqla_url= "db2+ibm_db://" + dataSource['user']+ ':' + dataSource['password'] + "@dashdb-entry-yp-dal09-07.services.dal.bluemix.net:50000/BLUDB"

#Pandas does not support many databases so we use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#dashDB map types
from sqlalchemy.types import String, Boolean
object_cols = df1.dtypes[df1.dtypes=='object'].index
bool_cols = df1.dtypes[df1.dtypes=='bool'].index
for col in bool_cols:
    df1[col] = df1[col].astype(int)
dashdb_typemap = {col : String(4000) for col in object_cols }

#Write to database
df1.to_sql(name=dataSet['table'], schema=dataSet['schema'], con=engine, if_exists='replace', dtype=dashdb_typemap,index=False)
#Read back again to confirm data has been written
query = 'select * from ' + dataSet['schema'] + '.' + tablename
df2 = pd.read_sql(query, con=conn)
df2.head()

where dashRDS is the name of the data set, dashdb-entry-yp-dal09-07.services.dal.bluemix.net is the sample hostname/IP of a DB2 warehouse database server, BLUDB is the example database name and 50000 is the db2 nonssl port number. ../datasets/userdatap1.csv is a sample csv used to create a data frame which this written to a table.

Python example for Informix

import dsx_core_utils, jaydebeapi, os, io
import pandas as pd

#Read csv to pandas
df2 = pd.DataFrame(raw_data2, columns = ['I', 'I2'])

dataSet = dsx_core_utils.get_remote_data_set_info('info')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])

conn = jaydebeapi.connect(dataSource['driver_class'], [dataSource['URL'], dataSource['user'], dataSource['password']])

#Write to informix
tablename=dataSet['table']
# Create table
#tablename='sampleTable'
#query = 'create table ' + tablename+'(i int, i2 int)'
#curs = conn.cursor()
#curs.execute(query)

# Insert the dataframe rows
for row in df2.itertuples():
    srows = str(row[1:]).strip("()")
    query2 = 'insert into ' + tablename + ' values('+srows+')'
    curs.execute(query2)

query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)

where info is the name of the data set and sampleTable is the name of the table you would like to create.

Python example for Netezza

import dsx_core_utils, jaydebeapi, os, io
import pandas as pd

# Read data sources
dataSet = dsx_core_utils.get_remote_data_set_info('netz')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])

conn = jaydebeapi.connect(dataSource['driver_class'], [dataSource['URL'], dataSource['user'], dataSource['password']])

# Write to Netezza
tablename=dataSet['table']
# Create table if does not exist
#tablename='sampleTable'
#query = 'create table ' + dataSet['schema'] + '.' + tablename +'(i int)'
#curs = conn.cursor()
#curs.execute(query)

# Insert the dataframe rows
for row in df2.itertuples():
    srows = str(row[1:]).strip("()")
    query2 = 'insert into ' + tablename + ' values('+srows+')'
    curs.execute(query2)

query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)

where netz is the name of the data set and sampleTable is the table you are writing to.

Python example for Oracle

Requirement: Oracle requires client libraries for the write to work. To install the library:

kubectl cp -n dsxuser-999 oracle-instantclient12.2-basic-12.2.0.1.0-1.x86_64.rpm <user jupyter pod>:/
rpm -ivh oracle-instantclient12.2-basic-12.2.0.1.0-1.x86_64.rpm
sudo sh -c "echo /opt/oracle/instantclient_12_2 > /etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export LD_LIBRARY_PATH=/opt/oracle/instantclient_12_2:$LD_LIBRARY_PATH

Then in the notebook, set the environment variables:

os.environ["PATH"]=os.environ["PATH"]+ ":/usr/lib/oracle/12.2/client64/lib"
os.environ["LD_LIBRARY_PATH"]=os.environ["LD_LIBRARY_PATH"]+ ":/usr/lib/oracle/12.2/client64/lib"
os.environ["ORACLE_HOME"]="/usr/lib/oracle/12.2/client64"

To write a pandas dataframe to the Oracle database:

#Oracle
import dsx_core_utils, os, io
import pandas as pd
from sqlalchemy import create_engine

#Read csv to pandas
df_data_1 = pd.read_csv('../datasets/CUST_HISTORY.csv')
df_data_1.head(5)

dataSet = dsx_core_utils.get_remote_data_set_info('oracle-rds')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])

sqla_url= "oracle+cx_oracle://" + dataSource['user']+ ':' + dataSource['password'] + "@9.87.654.321:1521/xe"

#We use recommended sqlalchemy
engine = create_engine(sqla_url, pool_size=10, max_overflow=20)
conn = engine.connect()

#Write to database (make sure the database user has write access and where table does not exist - has drop/create)
#replace, drops and recreates the table if it does not exist. append, appends and creates a table if it does not exist.
df_data_1.to_sql(dataSet['table'], engine, schema=dataSet['schema'], if_exists='replace')

#Read back again to confirm it has been written
query = 'select * from '+dataSet['schema']+'.'+dataSet['table']
df_data_1 = pd.read_sql(query, con=conn)
df_data_1.head()

where oracle-rds is the name of the data set, 9.87.654.321 is an example IP of the Oracle database host, and xe is the sample sid of the database.

Python example for HDFS

import dsx_core_utils, requests, os, io
import pandas as pd
from sqlalchemy import create_engine

dataSet = dsx_core_utils.get_remote_data_set_info('hdfsset1')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])

filePath='../datasets/userdatap1.csv'
destFilePath='/user/user1/userdatap2.csv'
url = (dataSource['URL'][:-1] if (dataSource['URL'].endswith('/')) else dataSource['URL']) + destFilePath + "?op=CREATE"
headers = {"Authorization": os.environ.get('DSX_TOKEN')}
response = requests.request("PUT", url, headers=headers, timeout=10, verify=False, allow_redirects=True, data=open(filePath, 'rb').read())

where hdfsset1 is the name of the data set, /user/user1/userdatap2.csv is the destination csv file name on HDFS, timeout=10 is the time to wait for the write to complete, and ../datasets/userdatap1.csv is the source CSV file on DSX Local. See webHDFS documentation for more details on file overwrite.

Python example for a custom JDBC data source

If you have a specific database vendor you have added support for then, you can write to it using the following example.

import dsx_core_utils, jaydebeapi, os, io
import pandas as pd

#Read csv to pandas
#df2 = pd.DataFrame(raw_data2, columns = ['I', 'I2'])

dataSet = dsx_core_utils.get_remote_data_set_info('custom-dbset')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])

conn = jaydebeapi.connect(dataSource['driver_class'], [dataSource['URL'], dataSource['user'], dataSource['password']])

#Write to custom database
tablename=dataSet['table']
# Create table
#tablename='sampleTable'
#query = 'create table ' + tablename+'(i int, i2 int)'
#curs = conn.cursor()
#curs.execute(query)

# Insert the dataframe rows
for row in df2.itertuples():
    srows = str(row[1:]).strip("()")
    query2 = 'insert into ' + tablename + ' values('+srows+')'
    curs.execute(query2)

query3 = "select * from " + tablename
df1 = pd.read_sql(query3, con=conn)
df1.head(5)

where custom-dbset is the name of the data set and sampleTable is the name of the table you would like to create.