ReloadFromArchive
This script fully reloads a dataset assuming files are the source of the data.
In the code shown below, replace the place holder values with your values.
csr.execute("DELETE FROM [stage table] WHERE Processed = 1")
csr.execute("DROP INDEX IF EXISTS [index name] ON [stage table]")
sql = "BULK INSERT [stage table view] FROM '" + tFile + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
csr.execute("CREATE NONCLUSTERED INDEX [index name] ON [stage table]([column name])")
Full script.
import os
import sys
import glob
import pyodbc as db
import zipfile as zf
#The following three varaibles should be
#populated by configuration
WarehouseProcessingRootDirectory = ''
BusinessDirectory = ''
ProcessDirectory = ''
DataDirectory = 'In\\'
ArchiveDirectory = 'Archive\\'
FileName = ''
fileTimeStampedTXT = ''
DestinationDirectory = os.path.join(WarehouseProcessingRootDirectory, BusinessDirectory, ProcessDirectory, DataDirectory)
ArchiveDestinationDirectory = os.path.join(WarehouseProcessingRootDirectory, BusinessDirectory, ProcessDirectory, ArchiveDirectory)
print("Starting: Processing ReloadAchive")
# get list of zip file
try:
print("Getting zip files from archive directory {}".format(ArchiveDestinationDirectory))
zipFiles = glob.glob(ArchiveDestinationDirectory + "*.zip", recursive=False)
except Exception as e:
print(e)
# Report number of zip files found
print("{} zip files found".format(len(zipFiles)))
# unzip each file in zipFiles
for zFile in zipFiles:
try:
print("Unzipping file {}".format(zFile))
zip_ref = zf.ZipFile(zFile, 'r')
zip_ref.extractall(DestinationDirectory)
zip_ref.close()
except Exception as e:
print(e)
# get list of txt files in C:/InterfaceAndExtractFiles/../In/
try:
print("Getting txt files from directory {}".format(DestinationDirectory))
txtFiles = glob.glob(DestinationDirectory + "*.txt", recursive=False)
except Exception as e:
print(e)
# Report number of zip files found
print("{} txt files found".format(len(txtFiles)))
# Create database connection
try:
print("Connecting to SQL Server database")
connection_string = 'DSN=ETL;'
conn = db.connect(connection_string)
except Exception as e:
print(e)
# preparing SQL Server
try:
print("Preparing database for update")
csr = conn.cursor()
csr.execute("DELETE FROM [stage table] WHERE Processed = 1")
csr.execute("DROP INDEX IF EXISTS [index name] ON [stage table]")
conn.commit()
csr.close()
except Exception as e:
print(e)
# creating counters
txtFileCount = len(txtFiles)
n = 1
# process each txt file
for tFile in txtFiles:
try:
print("Processng file {} of {}: Update database with {} file data.".format(n, txtFileCount, tFile))
n += 1
csr = conn.cursor()
sql = "BULK INSERT [stage table view] FROM '" + tFile + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
csr.execute(sql)
conn.commit()
csr.close()
except Exception as e:
print(e)
try:
print("Deleting {} file".format(tFile))
if os.path.isfile(tFile):
os.remove(tFile)
except Exception as e:
print(e)
# Complete SQL Server processing
try:
print("Completing SQL Server update")
print("Creating index on SQL table")
csr = conn.cursor()
csr.execute("CREATE NONCLUSTERED INDEX [index name] ON [stage table]([column name])")
print("Completing SQL Server update")
conn.commit()
csr.close()
conn.close()
except Exception as e:
print(e)
# Complete
print("COMPLETE: Process Reload from Archive")
Last updated