Documents
Data Warehouse ETL Framework
Data Warehouse ETL Framework
  • Introduction To The ETL Framework
  • Framework Overview
    • Framework Description
    • Why You Need An ETL Framework
      • What is the ETL Framework?
      • Framework Objectives
      • Can I use my own resources to implement The Framework?
    • Download The Framework
    • What's In The Box
    • v1.0 Release
    • Training Resources For Engineers
    • Getting Help
  • General Principles
    • Never Finished
    • Overall Engineering Philosophy
    • Self Service Analytics As The Goal
    • Strict Adherence To Globally Accepted Practice
    • Data Flow Design
    • Effective DataOps
    • Code First Philosophy
    • Melding Kimball And Inmon
    • Master Data Management
    • Open Architecture
    • Decoupled Systems
    • Independent Identically Executing Processes
    • Robust Processes
    • Self-Diagnostics
    • Data Provenance
    • Adverse Reporting
    • Error Recovery
    • Archrival And Recovery
    • Documentation As A Necessity
  • DataOps For The Uninitiated
    • DataOps Initiation
    • What Is Agile Software Development?
    • What Is DevOps?
    • What Is Statistical Process Control?
    • You Take All That And You Wind Up With...
    • The DataOps Manifesto
    • Want To Learn More About DataOps?
  • ETL Developer's Field Guide
    • The Plain English Explanation Of What You Are About To Build
    • Data Warehouse User Profiles
    • Using Data Source Names
    • Why You Should Not Use SSIS
      • SSIS Is Not Forward Compatible
      • SSIS Is Not Very Performant
      • Python Development Is Faster Than SSIS
      • SSIS Is Difficult To Deploy To Production
      • SSIS Is Difficult To Maintain
    • Python Software Engineering Considerations
    • Enterprise Data Ecosystem
      • EDW's Place In Enterprise Architecture
      • EDW ETL Overview
    • ETL Environment Databases
    • Data Acquisition Paradigms
      • Demand Pull
      • Supply Push
    • The Common Model
    • The Semantic Layer
    • Database Object Naming Conventions
      • Columns
      • Indexes
      • Primary Keys
      • Schemas
      • Stored Procedures
      • Tables
      • User Defined Functions
    • ETL Reference Architecture
    • Table Anatomy
      • Sample Dimension Table
      • Sample Fact Table
      • Sample Indexed View
      • Sample Junk Dimension Table
      • Sample Master Data Management Table
      • Sample Stage Table
    • Master Data Management
      • MDM With Exact Match On Source System Key
      • Implementation Details
    • Slowly Changing Dimension Processing
      • General Process
      • Creating Empty Records
      • SCD Processing Types
      • Implementing SCDs As Temporal Tables
    • Handling Calculated Values
    • Warehouse Load Commandments
    • Code Style Guide
      • SSIS
      • Transact SQL
      • Python
      • C#
    • The Role Of Schemas
      • Standard Schemas And Their Definitions
      • Schemas As A Security Device
      • Schemas As A DB Object Differentiator
    • Feedback And Control Systems
      • Having An Engineering Mindset
      • The Mechanics Of Checking For Unprocessed Records
      • Time Series Analysis 101
      • Passive Monitoring System Design Theory
      • Passive Monitoring System Implementation
    • Source Control
    • Loading Historical Data
    • Security Access Model
    • Wrapping Up
  • ETL Environment Set Up
    • Install Anaconda
    • Create Initial Source Control Folders
    • Create Primary Databases And Schemas
    • Customize And Run Deployment Scripts
    • Install Python Packages
    • Create Data Source Name
    • Create The SSIS Catalogue
    • Create The File I/O Directory Structure
    • Create Global Environment In The Integration Services Catalog
    • Create The SQL Server Aliases
    • Create The BIAnalytics Proxy Account
    • Create The Opt Folder
    • Configure Email Alerts
  • Standard SSIS ETL Development Package
    • Starting A New Process
    • Tracking Package Variables
    • Remove Things You Do Not Need
    • Creating New Configuration Settings
  • Sample Script Guide
    • SQL
      • Finalize And Audit Scripts
        • 18 CREATE PROC usp_RecordRowCounts
        • 19 CREATE PROC usp_MarkRecordsAsProcessed
        • 20 CREATE PROC usp_CheckForUnprocessedRecords
      • Monitoring Scripts
        • 21 CREATE PROC usp_DisplayTablesNotLoading
        • 22 CREATE PROC usp_LoadTableLoadReportingTable
        • 23 CREATE PROC usp_TableLoadMonitoring
      • Table Object Sample Scripts
        • sample type I dimension table
        • sample type II dimension table
        • sample fact table
        • sample indexed view
        • sample junk dimension table
        • sample master data management table
        • sample stage table
      • Data Processing Sample Scripts
        • Data Pull
        • batch processing
        • bulk insert
        • data cleansing
        • fact table load
        • remove dups
        • type I dimension processing
        • type II dimension processing
      • Helper Scripts
        • Create Database
        • add rowhash
        • change collation
        • configuration insert sample script
        • documentation block
        • populate fact table with fake data
        • proc execution scripts
        • show all columns in database
        • troubleshooting
        • util
        • start an agent job from T-SQL
    • Python
      • Building Blocks
        • CombineCSVsIntoOneFile
        • ConvertCommaToPipeDelimitedFile
        • ConvertExcelToCSV
        • LoopingOverFilesInADirectory
        • Process Zip File
        • QueryDatabaseAndWriteSmallFile
        • QueryDatabaseAndWriteLargeFile
        • SendEmail
        • StringMatching
        • YAMLConfigImport
      • Full Solutions
        • DownloadMoveAndStoreDataCSV
        • LoadLargeCSVsIntoDataWarehouseStagingTables
        • MoveAndStoreDataExcel
        • ReloadFromArchive
      • Jupyter Notebooks
        • Passive Monitoring System Design Theory
    • PySpark
      • ConnectToSpark
      • LoadCSV
      • ExampleDataProcessing
    • Windows Batch
  • A Methodology To Rapidly Convert OLTP Databases to OLAP Solutions
    • Step 1: Find The Nouns
    • Step 2: Find The Stuff We Want To Do Math On
    • Step 3: Analyze Relationships
  • Data Model Creation Tool
    • Record Count
    • Sample Data
    • Create Stage Table
    • Char Length Analysis
    • Column Notes
    • Column Cleansing Notes
    • Source To Target Mapping
    • Dimension List
    • Fact Table Creation Helper
    • Foreign Key Creation
    • Process Fact Script Helper
    • View Creation Helper
    • Date Role Play View Helper
    • Data Model View Creation Helper
    • List Population Values
  • Performance Monitoring
    • The Daily Chore
    • Diagnostic Tools
      • Using The Built-In Stored Procs
      • Using The Built-In Views
    • Logging Database Diagram
  • Data Warehouse Troubleshooting Guide
    • Generalized Troubleshooting Steps
  • Business Analytics Capability Maturity Model
    • What Is A Business Analytics Capability Maturity Model?
    • Level 0. Operational Reporting
    • Level 1. Rapid Delivery
    • Level 2. Self Service
    • Level 3. Central Repository
    • Level 4. Open Data 1
    • Level 5. Open Data 2
    • Level 6. Feedback 1
    • Level 7. Data Archaeology
    • Level 8. Crystal Ball
    • Level 9. Feedback 2
    • Level 10. Oil Rig
    • Level 11. The Singularity
  • Appendices
    • Appendix A. What is Medium Data®?
    • Appendix B. The Benefits Of Using Python And T-SQL Over SSIS For ETL
Powered by GitBook
On this page
  1. Sample Script Guide
  2. Python
  3. Full Solutions

ReloadFromArchive

This script fully reloads a dataset assuming files are the source of the data.

In the code shown below, replace the place holder values with your values.

csr.execute("DELETE FROM [stage table] WHERE Processed = 1")
csr.execute("DROP INDEX IF EXISTS [index name] ON [stage table]")
sql = "BULK INSERT [stage table view] FROM '" + tFile + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
csr.execute("CREATE NONCLUSTERED INDEX [index name] ON [stage table]([column name])")

Full script.

import os
import sys
import glob
import pyodbc as db
import zipfile as zf

#The following three varaibles should be
#populated by configuration
WarehouseProcessingRootDirectory = ''
BusinessDirectory = ''
ProcessDirectory = ''

DataDirectory = 'In\\'
ArchiveDirectory = 'Archive\\'
FileName = ''
fileTimeStampedTXT = ''

DestinationDirectory = os.path.join(WarehouseProcessingRootDirectory, BusinessDirectory, ProcessDirectory, DataDirectory)
ArchiveDestinationDirectory = os.path.join(WarehouseProcessingRootDirectory, BusinessDirectory, ProcessDirectory, ArchiveDirectory)

print("Starting: Processing ReloadAchive")

# get list of zip file
try:
    print("Getting zip files from archive directory {}".format(ArchiveDestinationDirectory))
    zipFiles = glob.glob(ArchiveDestinationDirectory + "*.zip", recursive=False)
except Exception as e:
    print(e)


# Report number of zip files found
print("{} zip files found".format(len(zipFiles)))

# unzip each file in zipFiles
for zFile in zipFiles:
    try:
        print("Unzipping file {}".format(zFile))
        zip_ref = zf.ZipFile(zFile, 'r')
        zip_ref.extractall(DestinationDirectory)
        zip_ref.close()
    except Exception as e:
        print(e)

# get list of txt files in C:/InterfaceAndExtractFiles/../In/
try:
    print("Getting txt files from directory {}".format(DestinationDirectory))
    txtFiles = glob.glob(DestinationDirectory + "*.txt", recursive=False)
except Exception as e:
    print(e)

# Report number of zip files found
print("{} txt files found".format(len(txtFiles)))

# Create database connection
try:
    print("Connecting to SQL Server database")
    connection_string = 'DSN=ETL;'
    conn = db.connect(connection_string)
except Exception as e:
    print(e)

# preparing SQL Server
try:
    print("Preparing database for update")
    csr = conn.cursor()
    csr.execute("DELETE FROM [stage table] WHERE Processed = 1")
    csr.execute("DROP INDEX IF EXISTS [index name] ON [stage table]")
    conn.commit()
    csr.close()
except Exception as e:
    print(e)

# creating counters
txtFileCount = len(txtFiles)
n = 1

# process each txt file
for tFile in txtFiles:
    try:
        print("Processng file {} of {}: Update database with {} file data.".format(n, txtFileCount, tFile))
        n += 1
        csr = conn.cursor()
        sql = "BULK INSERT [stage table view] FROM '" + tFile + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
        csr.execute(sql)
        conn.commit()
        csr.close()
    except Exception as e:
        print(e)

    try:
        print("Deleting {} file".format(tFile))
        if os.path.isfile(tFile):
            os.remove(tFile)
    except Exception as e:
        print(e)

# Complete SQL Server processing
try:
    print("Completing SQL Server update")
    print("Creating index on SQL table")
    csr = conn.cursor()
    csr.execute("CREATE NONCLUSTERED INDEX [index name] ON [stage table]([column name])")
    print("Completing SQL Server update")
    conn.commit()
    csr.close()
    conn.close()
except Exception as e:
    print(e)

# Complete
print("COMPLETE: Process Reload from Archive")
PreviousMoveAndStoreDataExcelNextJupyter Notebooks

Last updated 3 years ago