Skip to content

Commit

Permalink
Merge pull request #58 from TeoMeWhy/main
Browse files Browse the repository at this point in the history
Syn dota and from main
  • Loading branch information
TeoCalvo authored Aug 17, 2023
2 parents 509630c + 8085f61 commit 348f8ac
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 57 deletions.
30 changes: 18 additions & 12 deletions databases/create.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
-- Databricks notebook source
-- DBTITLE 1,Olist
CREATE DATABASE IF NOT EXISTS bronze.olist;
CREATE DATABASE IF NOT EXISTS silver.olist;
-- DBTITLE 1,DataSUS
CREATE DATABASE IF NOT EXISTS bronze.datasus;
CREATE DATABASE IF NOT EXISTS silver.datasus;

-- COMMAND ----------

-- DBTITLE 1,Dota
CREATE DATABASE IF NOT EXISTS bronze.dota;
CREATE DATABASE IF NOT EXISTS silver.dota;

-- COMMAND ----------

-- DBTITLE 1,IBGE
CREATE DATABASE IF NOT EXISTS bronze.ibge;
CREATE DATABASE IF NOT EXISTS silver.ibge;

-- COMMAND ----------

Expand All @@ -17,12 +29,6 @@ CREATE DATABASE IF NOT EXISTS silver.pizza_query;

-- COMMAND ----------

-- DBTITLE 1,DataSUS
CREATE DATABASE IF NOT EXISTS bronze.datasus;
CREATE DATABASE IF NOT EXISTS silver.datasus;

-- COMMAND ----------

-- DBTITLE 1,Dota
CREATE DATABASE IF NOT EXISTS bronze.dota;
CREATE DATABASE IF NOT EXISTS silver.dota;
-- DBTITLE 1,Olist
CREATE DATABASE IF NOT EXISTS bronze.olist;
CREATE DATABASE IF NOT EXISTS silver.olist;
12 changes: 12 additions & 0 deletions src/01.raw/datasus/datasources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"sihsus": {
"origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc",
"target": "/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc",
"period": "monthly"
},
"sinasc": {
"origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SINASC/1996_/Dados/DNRES/DN{uf}{ano}.dbc",
"target": "/dbfs/mnt/datalake/datasus/sinasc/dbc/landing/DN{uf}{ano}.dbc",
"period": "yearly"
}
}
82 changes: 60 additions & 22 deletions src/01.raw/datasus/ingestao.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import urllib.request
from multiprocessing import Pool
from tqdm import tqdm
import json

import datetime
from dateutil.relativedelta import relativedelta
Expand All @@ -15,40 +16,77 @@

import dttools

def get_data_uf_ano_mes(uf, ano, mes):
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc"
class IngestionRawSUS:

file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc"
try:
resp = urllib.request.urlretrieve(url, file_path)
except:
print("Não foi possível coletar o arquivo.")
def __init__(self, ufs, date_range, source, n_jobs=2):
self.ufs = ufs
self.n_jobs = n_jobs

def get_data_uf(uf, datas):
for i in tqdm(datas):
ano, mes, dia = i.split("-")
ano = ano[-2:]
get_data_uf_ano_mes(uf, ano, mes)
with open('datasources.json', 'r') as open_file:
datasources = json.load(open_file)

self.origin = datasources[source]['origin']
self.target = datasources[source]['target']
self.period = datasources[source]['period']

self.dates=[]
self.set_dates(date_range)

def set_dates(self, date_range):
self.dates = dttools.date_range(date_range[0], date_range[-1], period=self.period)


def get_data_uf_ano_mes(self, uf, ano, mes):

url = self.origin.format(uf=uf, ano=ano, mes=mes)
file_path = self.target.format(uf=uf, ano=ano, mes=mes)

try:
resp = urllib.request.urlretrieve(url, file_path)

except:
print(f"Não foi possível coletar o arquivo. {uf} | {ano}-{mes}-01")

def get_data_uf(self, uf):
for i in tqdm(self.dates):
ano, mes, dia = i.split("-")

if self.period == 'monthly':
ano = ano[-2:]

self.get_data_uf_ano_mes(uf, ano, mes)

ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO","DF"]

def auto_execute(self):
with Pool(self.n_jobs) as pool:
pool.map(self.get_data_uf, self.ufs)


# COMMAND ----------

datasource = dbutils.widgets.get("datasource")
dt_start = dbutils.widgets.get("dt_start")
dt_stop = dbutils.widgets.get("dt_stop")
delay = int(dbutils.widgets.get("delay"))

dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01")

datas = dttools.date_range(dt_start, dt_stop, monthly=True)
to_download = [(uf, datas) for uf in ufs]

with Pool(10) as pool:
pool.starmap(get_data_uf, to_download)
ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO", "DF"]

ufs.sort(reverse=True)


ing = IngestionRawSUS(ufs=ufs,
date_range=[dt_start, dt_stop],
source=datasource,
n_jobs=10)

# COMMAND ----------

ing.auto_execute()
20 changes: 20 additions & 0 deletions src/01.raw/datasus/ingestion_cid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Databricks notebook source
# MAGIC %pip install lxml

# COMMAND ----------

import pandas as pd

url = 'http://tabnet.datasus.gov.br/cgi/sih/mxcid10lm.htm'

dfs = pd.read_html(url)
df = dfs[2]
df.columns = df.columns.levels[1]

# COMMAND ----------

df.to_csv("/dbfs/mnt/datalake/datasus/cid/volume1.csv", index=False, sep=";")

# COMMAND ----------

df
26 changes: 17 additions & 9 deletions src/01.raw/datasus/transform.r
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
# Databricks notebook source
install.packages("read.dbc")
install.packages("doParallel")
install.packages("jsonlite")

# COMMAND ----------

library(read.dbc)
library(foreach)
library(doParallel)
library(jsonlite)

date = format(Sys.time(), "%Y%m%d")

dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing"
csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv"
datasource <- dbutils.widgets.get("datasource")
datasources <- fromJSON("datasources.json")

path = datasources[datasource][[1]]['target'][[1]]
partes <- unlist(strsplit(path, "/"))
partes <- partes[-length(partes)]
dbc_folder <- paste(partes, collapse = "/")
csv_folder <- sub('/dbc/landing', '/csv', dbc_folder)

files <- list.files(dbc_folder, full.names=TRUE)

Expand All @@ -31,12 +39,12 @@ etl <- function(f) {

registerDoParallel(8)
while (sum(is.na(files)) != length(files)) {
batch = files[1:min(8, length(files))]
files = files[1+min(8, length(files)):length(files)]
foreach (i=batch) %dopar% {
print(i)
if (is.na(i) == FALSE) {
etl(i)
}
batch = files[1:min(8, length(files))]
files = files[1+min(8, length(files)):length(files)]
foreach (i=batch) %dopar% {
print(i)
if (is.na(i) == FALSE) {
etl(i)
}
}
}
73 changes: 73 additions & 0 deletions src/02.bronze/datasus/ingestao_cid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Databricks notebook source
import pandas as pd
import string

def range_letter(x, stop_number=None):
letter = x[0]
start_number = int(x.split("-")[0][1:])
stop_number = int(x.split("-")[1][1:]) if stop_number == None else stop_number

values = [ f'{letter}{i:02d}' for i in range(start_number, stop_number+1) ]
return values


def make_range(x):

x = x.strip(" ")

try:
start, stop = x.split("-")
except ValueError as err:
return x

letter_start = start[0]
letter_start_pos = string.ascii_uppercase.find(letter_start)
number_start = int(float(start[1:]))

letter_stop = stop[0]
letter_stop_pos = string.ascii_uppercase.find(letter_stop)
number_stop = int(float(stop[1:]))

values = []
letter_pos = letter_start_pos
while letter_pos < letter_stop_pos:

letter = string.ascii_uppercase[letter_pos]

values.extend(range_letter(f'{letter}{number_start:02d}-{letter}99'))

letter_pos += 1
number_start = 1

values.extend(range_letter(f'{letter_stop}{number_start}-{letter_stop}{number_stop}'))

return values

# COMMAND ----------

df = pd.read_csv("/dbfs/mnt/datalake/datasus/cid/volume1.csv", sep=";")

# COMMAND ----------

df['DescricaoLista'] = df['Descrição'].fillna("").apply(lambda x: x.split(","))

df_explode = df.explode("DescricaoLista").iloc[:-1]
df_explode = df_explode[df_explode["Código"] != '-']

df_explode["DescricaoListaRange"] = df_explode["DescricaoLista"].apply(make_range)
df_completa = df_explode.explode("DescricaoListaRange")

columns = {
"Capítulo": "descCapituloCID" ,
"Código": "codCID",
"Códigos da CID-10": "descCID" ,
"Descrição": "codCID10" ,
"DescricaoListaRange": "codCID10dst",
}

df_completa = df_completa.rename(columns=columns)[columns.values()]
sdf = spark.createDataFrame(df_completa)

# COMMAND ----------

sdf.write.format("delta").mode("overwrite").saveAsTable("bronze.datasus.cid")
10 changes: 10 additions & 0 deletions src/02.bronze/ibge/ingestao_municipios.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
path = "/mnt/datalake/ibge/municipios_brasileiros.csv"

df = (spark.read
.csv(path, header=True, sep=";"))

(df.write
.format("delta")
.mode("overwrite")
.saveAsTable("bronze.ibge.municipios_brasileiros"))
7 changes: 7 additions & 0 deletions src/03.silver/datasus/cid.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Databricks notebook source
DROP TABLE IF EXISTS silver.datasus.cid;

CREATE TABLE IF NOT EXISTS silver.datasus.cid AS (
SELECT *
FROM bronze.datasus.cid
);
29 changes: 27 additions & 2 deletions src/03.silver/datasus/rd_sih.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@ DROP TABLE IF EXISTS silver.datasus.rd_sih;

CREATE TABLE IF NOT EXISTS silver.datasus.rd_sih AS

WITH tb_uf AS (
select distinct codUF, descUF
from silver.ibge.municipios_brasileiros
),

tb_cid AS (
select *
from silver.datasus.cid
where codCID not like '%-%'
and codCID10 not like '%.%'
qualify row_number() over (partition by codCID10dst order by codCID desc) = 1
)

SELECT
t1.UF_ZI,
-- t1.UF_ZI,
t32.descUF,
t1.ANO_CMPT,
t1.MES_CMPT,
t4.descEspecialidade,
Expand Down Expand Up @@ -44,7 +58,9 @@ SELECT
TO_DATE(t1.DT_INTER, 'yyyymmdd') AS DtInternacao,
TO_DATE(t1.DT_SAIDA, 'yyyymmdd') AS DtSaida,
t1.DIAG_PRINC,
t33.descCID AS descDiagnosticoCIDPrinc,
t1.DIAG_SECUN,
t34.descCID AS descDiagnosticoCIDSec,
t17.`descTipoCobrança` AS descTipoCobranca,
t10.descNaturezaHospitalSUS,
t11.descNaturezaJuridica,
Expand Down Expand Up @@ -160,7 +176,8 @@ ON t1.RUBRICA = t14.codRubrica
LEFT JOIN bronze.datasus.seqaih_5 AS t15
ON t1.SEQ_AIH5 = t15.codSeqAIH

LEFT JOIN bronze.datasus.subtipo_financ AS t16
LEFT JOIN bronze.datasusOutras complicações da gravidez e do parto
.subtipo_financ AS t16
ON t1.FAEC_TP = t16.codSubTipoFinanciamento

LEFT JOIN bronze.datasus.tipo_cobranca AS t17
Expand Down Expand Up @@ -208,4 +225,12 @@ ON t1.DIAGSEC8 = t30.codTpDiagSecundario
LEFT JOIN bronze.datasus.tp_diagsecundario as t31
ON t1.DIAGSEC9 = t31.codTpDiagSecundario

LEFT JOIN tb_uf AS t32
ON substring(t1.UF_ZI,0,2) = t32.codUF

LEFT JOIN tb_cid AS t33
ON substring(t1.DIAG_PRINC,0,3) = t33.codCID10dst

LEFT JOIN tb_cid AS t34
ON substring(t1.DIAG_SECUN,0,3) = t34.codCID10dst
;
9 changes: 9 additions & 0 deletions src/03.silver/ibge/municipios_brasileiros.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Databricks notebook source
DROP TABLE IF EXISTS silver.ibge.municipios_brasileiros;

CREATE TABLE IF NOT EXISTS silver.ibge.municipios_brasileiros AS (

SELECT *
FROM bronze.ibge.municipios_brasileiros

);
Loading

0 comments on commit 348f8ac

Please sign in to comment.