MoveAndStoreDataExcel

This process moves an Excel file into the data warehouse.

In the code shown below, replace the place holder values with your values.

    csr.execute("DELETE FROM [stage table] WHERE Processed = 1")
    csr.execute("DROP INDEX IF EXISTS [index name] ON [stage table]")
    sql = "BULK INSERT [stage table view] FROM '" + fileTimeStampedTXT + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
    csr.execute("CREATE NONCLUSTERED INDEX [index name] ON [stage table]([column name])")

Full script.

#See documenatation for use of this script.
import os
import sys
import datetime
import pandas as pd
import pyodbc as db
import zipfile as zf

#The following four varaibles should be
#populated by configuration
FileExchangeRootDirectory = ''
WarehouseProcessingRootDirectory = ''
BusinessDirectory = ''
ProcessDirectory = ''

DataDirectory = 'In\\'
ArchiveDirectory = 'Archive\\'
FileName = ''
fileTimeStampedTXT = ''

FullSourcePath = os.path.join(FileExchangeRootDirectory, BusinessDirectory, ProcessDirectory, DataDirectory, FileName)
DestinationDirectory = os.path.join(WarehouseProcessingRootDirectory, BusinessDirectory, ProcessDirectory, DataDirectory)
ArchiveDestinationDirectory = os.path.join(WarehouseProcessingRootDirectory, BusinessDirectory, ProcessDirectory, ArchiveDirectory)

print("Starting: Processing data")

# Check if required file exists

if not os.path.isfile(FullSourcePath):

    sys.exit("ERROR: Unable to find file {}".format(FullSourcePath))

# Read excel data into pandas and write | delimited txt file

try:

    print("Reading Excel file: {}".format(FullSourcePath))

    df = pd.read_excel(FullSourcePath, sheet_name="DATASET")

    # set timestamp for file processing

    print("Setting process timestamp")

    processTimeStamp = datetime.datetime.today().strftime('%Y%m%d')

    # Create txt filename wih timestamp

    fileTimeStampedTXT = DestinationDirectory + processTimeStamp + "_data.txt"

    print("Writing txt file to: {}".format(fileTimeStampedTXT))

    df.to_csv(fileTimeStampedTXT, sep="|", index=False)

except Exception as e:
    print(e)


# delete xlsx file

try:

    print("Deleting  xlsx file: {}".format(FullSourcePath))

    if os.path.isfile(FullSourcePath):

        os.remove(FullSourcePath)

except Exception as e:
    print(e)


# bulk load txt file to SQL Server

try:

    print("Connecting to SQL Server database")

    connection_string = 'DSN=ETL;'

    conn = db.connect(connection_string)

    print("Preparing database for update")

    csr = conn.cursor()

    csr.execute("DELETE FROM [stage table] WHERE Processed = 1")

    print("Preparing bulk insert update")

    sql = "BULK INSERT [stage table view] FROM '" + fileTimeStampedTXT + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"

    print("Update database with {} file data.".format(fileTimeStampedTXT))

    csr.execute(sql)

    conn.commit()

    csr.close()

    conn.close()

except Exception as e:
    print(e)



# zip txt file to archive

try:

    zipFile = ArchiveDestinationDirectory + processTimeStamp + "__data.zip"

    print("Creating zip file for txt file archive")

    archive = zf.ZipFile(zipFile, "w")

    archive.write(fileTimeStampedTXT, os.path.basename(fileTimeStampedTXT))

    archive.close

except Exception as e:
    print(e)



# delete txt file

try:

    print("Deleting  txt file: {}".format(fileTimeStampedTXT))

    if os.path.isfile(fileTimeStampedTXT):

        os.remove(fileTimeStampedTXT)

except Exception as e:
    print(e)

# Script Complete

print("Complete: Processing Data")

Last updated