Documents
Data Warehouse ETL Framework
Data Warehouse ETL Framework
  • Introduction To The ETL Framework
  • Framework Overview
    • Framework Description
    • Why You Need An ETL Framework
      • What is the ETL Framework?
      • Framework Objectives
      • Can I use my own resources to implement The Framework?
    • Download The Framework
    • What's In The Box
    • v1.0 Release
    • Training Resources For Engineers
    • Getting Help
  • General Principles
    • Never Finished
    • Overall Engineering Philosophy
    • Self Service Analytics As The Goal
    • Strict Adherence To Globally Accepted Practice
    • Data Flow Design
    • Effective DataOps
    • Code First Philosophy
    • Melding Kimball And Inmon
    • Master Data Management
    • Open Architecture
    • Decoupled Systems
    • Independent Identically Executing Processes
    • Robust Processes
    • Self-Diagnostics
    • Data Provenance
    • Adverse Reporting
    • Error Recovery
    • Archrival And Recovery
    • Documentation As A Necessity
  • DataOps For The Uninitiated
    • DataOps Initiation
    • What Is Agile Software Development?
    • What Is DevOps?
    • What Is Statistical Process Control?
    • You Take All That And You Wind Up With...
    • The DataOps Manifesto
    • Want To Learn More About DataOps?
  • ETL Developer's Field Guide
    • The Plain English Explanation Of What You Are About To Build
    • Data Warehouse User Profiles
    • Using Data Source Names
    • Why You Should Not Use SSIS
      • SSIS Is Not Forward Compatible
      • SSIS Is Not Very Performant
      • Python Development Is Faster Than SSIS
      • SSIS Is Difficult To Deploy To Production
      • SSIS Is Difficult To Maintain
    • Python Software Engineering Considerations
    • Enterprise Data Ecosystem
      • EDW's Place In Enterprise Architecture
      • EDW ETL Overview
    • ETL Environment Databases
    • Data Acquisition Paradigms
      • Demand Pull
      • Supply Push
    • The Common Model
    • The Semantic Layer
    • Database Object Naming Conventions
      • Columns
      • Indexes
      • Primary Keys
      • Schemas
      • Stored Procedures
      • Tables
      • User Defined Functions
    • ETL Reference Architecture
    • Table Anatomy
      • Sample Dimension Table
      • Sample Fact Table
      • Sample Indexed View
      • Sample Junk Dimension Table
      • Sample Master Data Management Table
      • Sample Stage Table
    • Master Data Management
      • MDM With Exact Match On Source System Key
      • Implementation Details
    • Slowly Changing Dimension Processing
      • General Process
      • Creating Empty Records
      • SCD Processing Types
      • Implementing SCDs As Temporal Tables
    • Handling Calculated Values
    • Warehouse Load Commandments
    • Code Style Guide
      • SSIS
      • Transact SQL
      • Python
      • C#
    • The Role Of Schemas
      • Standard Schemas And Their Definitions
      • Schemas As A Security Device
      • Schemas As A DB Object Differentiator
    • Feedback And Control Systems
      • Having An Engineering Mindset
      • The Mechanics Of Checking For Unprocessed Records
      • Time Series Analysis 101
      • Passive Monitoring System Design Theory
      • Passive Monitoring System Implementation
    • Source Control
    • Loading Historical Data
    • Security Access Model
    • Wrapping Up
  • ETL Environment Set Up
    • Install Anaconda
    • Create Initial Source Control Folders
    • Create Primary Databases And Schemas
    • Customize And Run Deployment Scripts
    • Install Python Packages
    • Create Data Source Name
    • Create The SSIS Catalogue
    • Create The File I/O Directory Structure
    • Create Global Environment In The Integration Services Catalog
    • Create The SQL Server Aliases
    • Create The BIAnalytics Proxy Account
    • Create The Opt Folder
    • Configure Email Alerts
  • Standard SSIS ETL Development Package
    • Starting A New Process
    • Tracking Package Variables
    • Remove Things You Do Not Need
    • Creating New Configuration Settings
  • Sample Script Guide
    • SQL
      • Finalize And Audit Scripts
        • 18 CREATE PROC usp_RecordRowCounts
        • 19 CREATE PROC usp_MarkRecordsAsProcessed
        • 20 CREATE PROC usp_CheckForUnprocessedRecords
      • Monitoring Scripts
        • 21 CREATE PROC usp_DisplayTablesNotLoading
        • 22 CREATE PROC usp_LoadTableLoadReportingTable
        • 23 CREATE PROC usp_TableLoadMonitoring
      • Table Object Sample Scripts
        • sample type I dimension table
        • sample type II dimension table
        • sample fact table
        • sample indexed view
        • sample junk dimension table
        • sample master data management table
        • sample stage table
      • Data Processing Sample Scripts
        • Data Pull
        • batch processing
        • bulk insert
        • data cleansing
        • fact table load
        • remove dups
        • type I dimension processing
        • type II dimension processing
      • Helper Scripts
        • Create Database
        • add rowhash
        • change collation
        • configuration insert sample script
        • documentation block
        • populate fact table with fake data
        • proc execution scripts
        • show all columns in database
        • troubleshooting
        • util
        • start an agent job from T-SQL
    • Python
      • Building Blocks
        • CombineCSVsIntoOneFile
        • ConvertCommaToPipeDelimitedFile
        • ConvertExcelToCSV
        • LoopingOverFilesInADirectory
        • Process Zip File
        • QueryDatabaseAndWriteSmallFile
        • QueryDatabaseAndWriteLargeFile
        • SendEmail
        • StringMatching
        • YAMLConfigImport
      • Full Solutions
        • DownloadMoveAndStoreDataCSV
        • LoadLargeCSVsIntoDataWarehouseStagingTables
        • MoveAndStoreDataExcel
        • ReloadFromArchive
      • Jupyter Notebooks
        • Passive Monitoring System Design Theory
    • PySpark
      • ConnectToSpark
      • LoadCSV
      • ExampleDataProcessing
    • Windows Batch
  • A Methodology To Rapidly Convert OLTP Databases to OLAP Solutions
    • Step 1: Find The Nouns
    • Step 2: Find The Stuff We Want To Do Math On
    • Step 3: Analyze Relationships
  • Data Model Creation Tool
    • Record Count
    • Sample Data
    • Create Stage Table
    • Char Length Analysis
    • Column Notes
    • Column Cleansing Notes
    • Source To Target Mapping
    • Dimension List
    • Fact Table Creation Helper
    • Foreign Key Creation
    • Process Fact Script Helper
    • View Creation Helper
    • Date Role Play View Helper
    • Data Model View Creation Helper
    • List Population Values
  • Performance Monitoring
    • The Daily Chore
    • Diagnostic Tools
      • Using The Built-In Stored Procs
      • Using The Built-In Views
    • Logging Database Diagram
  • Data Warehouse Troubleshooting Guide
    • Generalized Troubleshooting Steps
  • Business Analytics Capability Maturity Model
    • What Is A Business Analytics Capability Maturity Model?
    • Level 0. Operational Reporting
    • Level 1. Rapid Delivery
    • Level 2. Self Service
    • Level 3. Central Repository
    • Level 4. Open Data 1
    • Level 5. Open Data 2
    • Level 6. Feedback 1
    • Level 7. Data Archaeology
    • Level 8. Crystal Ball
    • Level 9. Feedback 2
    • Level 10. Oil Rig
    • Level 11. The Singularity
  • Appendices
    • Appendix A. What is Medium Data®?
    • Appendix B. The Benefits Of Using Python And T-SQL Over SSIS For ETL
Powered by GitBook
On this page
  1. Sample Script Guide
  2. PySpark

ExampleDataProcessing

This notebook shows various methods for working with Spark using Spark SQL and DataFrames. I will discuss what each line is doing.

All the imports necessary to connect to Spark and work with Spark SQL.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as func
from pyspark.sql.types import *
sc = SparkContext()
sqlq = SQLContext(sc)
import os

This pulls a CSV onto the Spark cluster. The .option("quote","^") method is the result of a quirk. If you don't specify a value for quote, it will default to ". If you're trying to escape things, we don't want to use " as an escape character. So use something funky that would rarely show up in text like the carrot symbol.

We are actually going to load two files here and then bump them together later.

rdf = sqlq.read.format("csv").option("header","true").option("delimiter","|").option("quote","^").load("[Path To Your File]")
tnpdf = sqlq.read.format("csv").option("header","true").option("delimiter","|").option("quote","^").load("[Path To Your File]")

This is how to do a record count.

rdf.count()

Filtering on one column.

rdf = rdf.filter(rdf.[YourColumn] != '')

Filtering on more than one column.

rdf = rdf.filter(rdf.[YourColumn1] != 'SomeValue').filter(rdf.[YourColumn2] != 'SomeOtherValue')

Filtering on more than one value.

ExcludeList = ['Value1','Value2',...'ValueN']
rdf = rdf.filter(rdf.[YourColumnName].isin(ExcludeList) == False)

Joining data frames.

joined = rdf.join(tnpdf,func.lower(rdf.[YourJoinColumn]) == func.lower(tnpdf.[YourJoinColumn]), how="left")

Dropping a column out of your dataset.

no[YourBooleanColumn] = joined.drop('[YourBooleanColumn]')

Show only 20 records of one column.

no[YourBooleanColumn].select(no[YourBooleanColumn].[YourColumnName]).show(n=20)

Create a lambda function.

CheckFor[YourBooleanColumn] = func.udf(lambda x: 'No' if x is None else 'Yes', StringType())

I have no idea what this is doing. I wrote this a year ago. See! This is why we document!

But seriously. There is a course planned called Advanced Data Systems Design And Implementation. In that class, I'll take time out of my precious dwindling existence to break this line of code down.

For now, these are not the droids you're looking for. Move along.

with[YourBooleanColumn] = no[YourBooleanColumn].withColumn('[YourBooleanColumn]',func.lit(CheckFor[YourBooleanColumn](joined.[YourColumnName])))

Display columns in a dataset.

with[YourBooleanColumn].columns

Show all columns, filter on one column, show only 20 records.

CleanedResults.select('*').where(CleanedResults.[YourColumnName]=='FilterValue').show(n=20)

This is a really important line. I work with data in and data out. Spark does not really have a nice "data out" function which is SUPER WEIRD. I had to do a lot of research to figure out how to spit the results of my work back out as a CSV.

The challenge here is in how Spark works. It is a distributed system. Without boring you with the ins and outs of MapReduce, what essentially happens is that your data is split up among all the machines in the cluster. So its not like you can just to_csv() and call it a day. The system has to collect all the parts and put Humpty back together.

Anyway, enjoy the fruits of my labor. I should charge you like $2,000 just to give you this one line of code.

CleanedResults.repartition(1).write.option('nullValue',None).csv('[Path To Your Output File]', sep='|',header='true',mode='overwrite')
PreviousLoadCSVNextWindows Batch

Last updated 3 years ago