Skip to content

Commit

Permalink
Add TSE to bronze (all tables)
Browse files Browse the repository at this point in the history
  • Loading branch information
TeoCalvo authored and TeoCalvo committed Jan 18, 2024
1 parent 7db38f8 commit 17536ef
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/01.raw/tse/datasources.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"sufix_not_remove": ["BRASIL.csv"],
"sufix_manage": ["csv", "txt"],
"anos": {
"start": 2018,
"start": 2000,
"stop": 2022,
"step": 2
}
Expand All @@ -17,7 +17,7 @@
"sufix_not_remove": ["BRASIL.csv"],
"sufix_manage": ["csv", "txt"],
"anos": {
"start": 2000,
"start": 2018,
"stop": 2022,
"step": 2
}
Expand Down
69 changes: 69 additions & 0 deletions src/02.bronze/tse/datasources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"candidatos":{
"file_format": "csv",
"id_fields":[
"ANO_ELEICAO",
"CD_ELEICAO",
"NR_TURNO",
"SG_UF",
"NM_UE",
"CD_CARGO",
"SQ_CANDIDATO",
"NR_CANDIDATO"
],
"partition_fields" : ["ANO_ELEICAO", "SG_UF"],
"path" : "/mnt/datalake/tse/candidatos/{file_format}",
"read_options": {
"sep":";",
"header":"true",
"encoding": "latin1"
},
"timestamp_field": "ANO_ELEICAO"
},

"bem_candidatos":{
"file_format": "csv",
"id_fields":[
"ANO_ELEICAO",
"CD_ELEICAO",
"SG_UF",
"SG_UE",
"SQ_CANDIDATO",
"NR_ORDEM_CANDIDATO"
],
"partition_fields" : ["ANO_ELEICAO", "SG_UF"],
"path" : "/mnt/datalake/tse/bem_candidatos/{file_format}",
"read_options": {
"sep":";",
"header":"true",
"encoding": "latin1",
"multiLine": "true"
},
"timestamp_field": "ANO_ELEICAO"
},

"votacao":{
"file_format": "csv",
"id_fields" : [
"ANO_ELEICAO",
"NR_TURNO",
"CD_ELEICAO",
"SG_UF",
"CD_MUNICIPIO",
"NR_ZONA",
"NR_SECAO",
"CD_CARGO",
"NR_VOTAVEL"
],
"partition_fields" : ["ANO_ELEICAO", "SG_UF"],
"path" : "/mnt/datalake/tse/votacao/{file_format}",
"read_options": {
"sep":";",
"header":"true",
"encoding": "latin1"
},
"timestamp_field": "ANO_ELEICAO"
}
}


70 changes: 70 additions & 0 deletions src/02.bronze/tse/ingestao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Databricks notebook source
import sys

sys.path.insert(0, "../../lib/")

import ingestors as ing
import dateutil
import dbtools

import json

# COMMAND ----------

with open("datasources.json", "r") as open_file:
datasources = json.load(open_file)

dbutils.widgets.combobox("table",
list(datasources.keys())[0],
list(datasources.keys()),
"Tabela",
)

# COMMAND ----------

table = dbutils.widgets.get("table")

database = "bronze.tse"

id_fields = datasources[table]["id_fields"]
partition_fields = datasources[table]["partition_fields"]
file_format = datasources[table]["file_format"]
path = datasources[table]["path"].format(file_format=file_format)
read_options = datasources[table]["read_options"]
timestamp_field = datasources[table]["timestamp_field"]

# COMMAND ----------

ingestor = ing.IngestaoBronze(
path_full_load = path,
path_incremental = path,
file_format = file_format,
table_name = table,
database_name = database,
id_fields = id_fields,
timestamp_field = timestamp_field,
partition_fields = partition_fields,
read_options = read_options,
spark=spark
)

# COMMAND ----------

if not dbtools.table_exists(spark, database, table):
print("Criando tabela em delta...")
df = ingestor.load_full().createOrReplaceTempView(table)
df_transformed = ingestor.transform(table)

(spark.createDataFrame(data=[], schema=df_transformed.schema)
.write
.mode("overwrite")
.format("delta")
.partitionBy(*partition_fields)
.option("overwriteSchema", "true")
.saveAsTable(f"{database}.{table}")
)
print("")

# COMMAND ----------

stream = ingestor.process_stream()

0 comments on commit 17536ef

Please sign in to comment.