diff --git a/src/01.raw/tse/datasources.json b/src/01.raw/tse/datasources.json index 71d8fe5..1491958 100644 --- a/src/01.raw/tse/datasources.json +++ b/src/01.raw/tse/datasources.json @@ -5,7 +5,7 @@ "sufix_not_remove": ["BRASIL.csv"], "sufix_manage": ["csv", "txt"], "anos": { - "start": 2018, + "start": 2000, "stop": 2022, "step": 2 } @@ -17,7 +17,7 @@ "sufix_not_remove": ["BRASIL.csv"], "sufix_manage": ["csv", "txt"], "anos": { - "start": 2000, + "start": 2018, "stop": 2022, "step": 2 } diff --git a/src/02.bronze/tse/datasources.json b/src/02.bronze/tse/datasources.json new file mode 100644 index 0000000..91cd6d8 --- /dev/null +++ b/src/02.bronze/tse/datasources.json @@ -0,0 +1,69 @@ +{ + "candidatos":{ + "file_format": "csv", + "id_fields":[ + "ANO_ELEICAO", + "CD_ELEICAO", + "NR_TURNO", + "SG_UF", + "NM_UE", + "CD_CARGO", + "SQ_CANDIDATO", + "NR_CANDIDATO" + ], + "partition_fields" : ["ANO_ELEICAO", "SG_UF"], + "path" : "/mnt/datalake/tse/candidatos/{file_format}", + "read_options": { + "sep":";", + "header":"true", + "encoding": "latin1" + }, + "timestamp_field": "ANO_ELEICAO" + }, + + "bem_candidatos":{ + "file_format": "csv", + "id_fields":[ + "ANO_ELEICAO", + "CD_ELEICAO", + "SG_UF", + "SG_UE", + "SQ_CANDIDATO", + "NR_ORDEM_CANDIDATO" + ], + "partition_fields" : ["ANO_ELEICAO", "SG_UF"], + "path" : "/mnt/datalake/tse/bem_candidatos/{file_format}", + "read_options": { + "sep":";", + "header":"true", + "encoding": "latin1", + "multiLine": "true" + }, + "timestamp_field": "ANO_ELEICAO" + }, + + "votacao":{ + "file_format": "csv", + "id_fields" : [ + "ANO_ELEICAO", + "NR_TURNO", + "CD_ELEICAO", + "SG_UF", + "CD_MUNICIPIO", + "NR_ZONA", + "NR_SECAO", + "CD_CARGO", + "NR_VOTAVEL" + ], + "partition_fields" : ["ANO_ELEICAO", "SG_UF"], + "path" : "/mnt/datalake/tse/votacao/{file_format}", + "read_options": { + "sep":";", + "header":"true", + "encoding": "latin1" + }, + "timestamp_field": "ANO_ELEICAO" + } +} + + diff --git a/src/02.bronze/tse/ingestao.py b/src/02.bronze/tse/ingestao.py new file mode 100644 index 0000000..8b5d2f0 --- /dev/null +++ b/src/02.bronze/tse/ingestao.py @@ -0,0 +1,70 @@ +# Databricks notebook source +import sys + +sys.path.insert(0, "../../lib/") + +import ingestors as ing +import dateutil +import dbtools + +import json + +# COMMAND ---------- + +with open("datasources.json", "r") as open_file: + datasources = json.load(open_file) + +dbutils.widgets.combobox("table", + list(datasources.keys())[0], + list(datasources.keys()), + "Tabela", + ) + +# COMMAND ---------- + +table = dbutils.widgets.get("table") + +database = "bronze.tse" + +id_fields = datasources[table]["id_fields"] +partition_fields = datasources[table]["partition_fields"] +file_format = datasources[table]["file_format"] +path = datasources[table]["path"].format(file_format=file_format) +read_options = datasources[table]["read_options"] +timestamp_field = datasources[table]["timestamp_field"] + +# COMMAND ---------- + +ingestor = ing.IngestaoBronze( + path_full_load = path, + path_incremental = path, + file_format = file_format, + table_name = table, + database_name = database, + id_fields = id_fields, + timestamp_field = timestamp_field, + partition_fields = partition_fields, + read_options = read_options, + spark=spark +) + +# COMMAND ---------- + +if not dbtools.table_exists(spark, database, table): + print("Criando tabela em delta...") + df = ingestor.load_full().createOrReplaceTempView(table) + df_transformed = ingestor.transform(table) + + (spark.createDataFrame(data=[], schema=df_transformed.schema) + .write + .mode("overwrite") + .format("delta") + .partitionBy(*partition_fields) + .option("overwriteSchema", "true") + .saveAsTable(f"{database}.{table}") + ) + print("") + +# COMMAND ---------- + +stream = ingestor.process_stream()