ETL

This time I downloaded all the historial exchange rates from 1999 to the current date of 25-Mar-2025.

Following on from my previous blog loading historical RBA Exchange Rate data. The CSV file now has 134,462 rows including the header. For this amount of data we're better off to use a more formal structured ETL approach.

Modify The Python Script

The first thing I had to do was modify my Python script a little as the initial row that I use to validate the file as an RBA historical exchange rate file changed. Additionally I implemented another option, append_to_output, which if "y" appends data to the file rather than overwriting it.

Also on eyeballing the data I notice the word "Closed" on some dates which must be public holidays for that foreign country. I'll need to account for this is my process.

                
  1. import os
  2. import sys
  3. CLI_SEPARATOR = "======================================================================\n"
  4. CLI_HEADER = "\nprocess_forex_csv\nProcesses CSV version of RBA Exchange Rate history:\nhttps://www.rba.gov.au/statistics/historical-data.html#exchange-rates\nfrom supplied format to a format ready for ETL"
  5. CLI_USAGE = "Usage:\nprocess_forex_csv.py [input_filename] [output_filename] [format_as_sql (y/n)] [append_to_output]\n"
  6. OUTPUT_HEADER = "Date,Currency,Rate\n"
  7. FILE_HEADER_1 = "EXCHANGE RATES ,,,,,,,,,,,,,,,,,,,,,,,"
  8. FILE_HEADER_2 = "HISTORICAL DAILY EXCHANGE RATES OF THE AUSTRALIAN DOLLAR AGAINST:"
  9. FOREX_HEADER = "Series ID"
  10. months = {
  11. "jan": "01",
  12. "feb": "02",
  13. "mar": "03",
  14. "apr": "04",
  15. "may": "05",
  16. "jun": "06",
  17. "jul": "07",
  18. "aug": "08",
  19. "sep": "09",
  20. "oct": "10",
  21. "nov": "11",
  22. "dec": "12"
  23. }
  24. class ProcessForexCSV:
  25. def __init__(self, input_filename, output_filename, format_for_sql, append_to_output):
  26. self.input_filename = input_filename
  27. self.output_filename = output_filename
  28. self.format_for_sql = format_for_sql
  29. self.append_to_output = append_to_output
  30. self.currencies = []
  31. def process_file(self):
  32. input_file_path = os.path.join(os.getcwd(), self.input_filename)
  33. output_file_path = os.path.join(os.getcwd(), self.output_filename)
  34. with open(input_file_path, "rt") as f:
  35. new_lines = ""
  36. line = f.readline()
  37. if line.find(FILE_HEADER_1) < 0 and line.find(FILE_HEADER_2) < 0:
  38. print("Invalid file format.")
  39. return
  40. line = f.readline()
  41. while line.find(FOREX_HEADER) < 0:
  42. line = f.readline()
  43. tokens = line.split(",")
  44. i = 0
  45. for token in tokens:
  46. if i > 0:
  47. self.currencies.append(token[len(token)-3:].strip())
  48. i += 1
  49. print("Currencies:\n{0}".format(CLI_SEPARATOR))
  50. for currency in self.currencies:
  51. print("{0}".format(currency))
  52. line = f.readline().strip()
  53. line_count = 0
  54. while line != "":
  55. line_count += 1
  56. line_tokens = line.split(",")
  57. token_count = len(line_tokens)
  58. exchg_date = line_tokens[0]
  59. original_exchg_date = exchg_date
  60. date_tokens = exchg_date.split("-")
  61. exchg_date = date_tokens[2] + months[date_tokens[1].lower()] + date_tokens[0]
  62. i = 1
  63. while i < token_count:
  64. if line_tokens[i] == "":
  65. line_tokens[i] = "NULL"
  66. exchg_line = ""
  67. if not self.format_for_sql:
  68. exchg_line = "{0},{1},{2}\n".format(original_exchg_date, self.currencies[i-1], line_tokens[i])
  69. else:
  70. exchg_line = "select '" + exchg_date + "','" + self.currencies[i-1] + "'," + line_tokens[i] + " union all\n"
  71. i += 1
  72. if exchg_line != "":
  73. #print(exchg_line)
  74. new_lines += exchg_line
  75. line = f.readline().strip()
  76. f.close()
  77. # get rid of the final "union all"
  78. if self.format_for_sql:
  79. new_lines = new_lines[:len(new_lines)-len(" union all\n")]
  80. else:
  81. new_lines = new_lines[:len(new_lines)-len("\n")]
  82. #print(new_lines)
  83. file_mode = "at" if self.append_to_output else "wt"
  84. with open(output_file_path, file_mode) as f2:
  85. if not self.format_for_sql and not self.append_to_output:
  86. f2.write(OUTPUT_HEADER)
  87. f2.write(new_lines)
  88. f2.close()
  89. print("{0}\n{1} exchange rates {2} to {3}\n{4}".format(CLI_SEPARATOR, line_count, "appended" if self.append_to_output else "written", self.output_filename, CLI_SEPARATOR))
  90. if __name__ == "__main__":
  91. print(CLI_HEADER)
  92. print(CLI_SEPARATOR)
  93. print(CLI_USAGE)
  94. if len(sys.argv) == 5:
  95. input_filename = sys.argv[1]
  96. output_filename = sys.argv[2]
  97. format_for_sql = True if str(sys.argv[3]).lower() == "y" else False
  98. append_to_output = True if str(sys.argv[4]).lower() == "y" else False
  99. process = ProcessForexCSV(input_filename, output_filename, format_for_sql, append_to_output)
  100. process.process_file()
  101. else:
  102. print("Invalid number of parameters.")

Database Staging

I will use ETL to populate a staging table in the database. The data type of the columns in simply varchar.

                
  1. create table stage_forex
  2. (
  3. stage_id int not null identity,
  4. exchg_date varchar(11) not null,
  5. exchg_currency varchar(5) not null,
  6. exchg_rate varchar(20) null,
  7. constraint PK_stage_forex primary key (stage_id)
  8. )

Now a stored procedure to migrate the data from staging into the final tables. I have explicitly declared a transaction here for purposes of exposition but in reality the insert statement is a single transaction anyway. Note line 8 which converts "null", "closed" or "N/A" to NULL. Also the exchange rate is converted to the data type of the destination column, decimal (19,4). This is an ideal data type for use with currency values.

                
  1. create proc sp_migrate_forex as
  2. begin
  3. begin transaction
  4. begin try
  5. insert forex (exchg_date, exchg_currency, exchg_rate)
  6. select convert(date, exchg_date), convert(varchar(3), exchg_currency),
  7. case when exchg_rate = 'null' or lower(exchg_rate) = 'closed' or lower(exchg_rate) = 'n/a' then NULL
  8. else convert(decimal(19,4), exchg_rate) end
  9. from stage_forex
  10. commit transaction
  11. end try
  12. begin catch
  13. rollback transaction
  14. raiserror ('Error during forex migration', 10, 1)
  15. end catch
  16. end

The ETL Package

I am using SQL Server Integration Services and Visual Studio 2022 to build the ETL package.

  1. I create a new Integration Services project
  2. I drag a Data Flow Task on to the canvas and right-click Edit
  3. I drag a Source Assistant object onto the canvas and select New Connection
  4. I configure the connection to my output.csv created by the Python scripts:

SSIS Flat File Source

SSIS CSV Source File Preview

I drag a Destination Assistant object onto the canvas, select SQL Server as the Destination type then double click New. I choose the Microsoft OLE DB Provider For SQL Server from the Provider list then enter my credentials:

SSIS Source Connection manager

I connect the Flat File Source to the OLE DB Destination:

SSIS Data Flow Task

I right-click Edit the OLE DB Destination, select stage_forex as the destination table then configure the mappings:

SSIS OLE DB Mappings

I execute the SSIS package and after a very brief delay it reports successful execution.

I run a query in SSMS to determine if there are any potentially problematic exchange rate values and identify the value N/A which I had not accounted for in my migration stored procedure so I modify it accordingly.

Check exchange rate values

Finally I execute exec sp_migrate_forex in SSMS and after a brief delay the migration is complete. It reports 134,461 rows affected which is exactly the original number of rows in my CSV file minus the heading row.

I have constructed a Forex Chart page to query the exchange rates from Azure.

Ed. I moved the data to a MySQL database closer to the code. The latency of Azure was intolerable.