Skip to content

Merging the Geotweets from CGA and Salzburg

dkakkar edited this page Sep 15, 2020 · 2 revisions

Final Schema for Merged Database:

Note: (message_id is used as the primary key) ​ These are the following attributes with their corresponding datatype: ​

  1. message_id bigint NOT NULL
  2. tweet_date bigint
  3. tweet_text character varying COLLATE pg_catalog."default"
  4. tags character varying COLLATE pg_catalog."default"
  5. tweet_lang character varying COLLATE pg_catalog."default"
  6. source character varying COLLATE pg_catalog."default"
  7. place character varying COLLATE pg_catalog."default"
  8. retweets bigint
  9. tweet_favorites bigint
  10. photo_url character varying COLLATE pg_catalog."default"
  11. quoted_status_id bigint
  12. user_id character varying COLLATE pg_catalog."default"
  13. user_name character varying COLLATE pg_catalog."default"
  14. user_location character varying COLLATE pg_catalog."default"
  15. followers bigint
  16. friends bigint
  17. user_favorites bigint
  18. status bigint
  19. user_lang character varying COLLATE pg_catalog."default"
  20. latitude double precision
  21. longitude double precision
  22. data_source smallint[]
  23. gps boolean
  24. spatialerror double precision
  25. reply_to_user_id character varying COLLATE pg_catalog."default"
  26. reply_to_tweet_id character varying COLLATE pg_catalog."default"
  27. place_id character varying COLLATE pg_catalog."default" ​

Running Code

python script.py config.conf

script.py is the respective script and config.conf store the configuration, utils.py act as bridging between main script and configurations. ​

Procedure Used For Cron Job Merge:

1. Created cga_table,salzburg_table,merged_table,final_db if they do not exist.

​ At a given moment cga_table, salzburg_table will be used to store cga and salzburg records of a particular hour respectively. The merged_table will act as a staging table to store hourly merged geo-tweet data helping us to export merged CSV to output directory. final_db will store all the aggregated merged data so far. ​

2. Selecting Necessary Files

​ This would be running as a Cron Job to achieve realtime merge, since data is received after a day only the previous day files needs to be excessed. ​ str(datetime.date.today()-datetime.timedelta(1)) returns previous day date in '2020-09-11' format (connected by hyphen) but salzburg and cga files are in format '2020_09_11' (connected by underscore) so that needs to converted: ​

    previous_day = list(str(datetime.date.today()-datetime.timedelta(1)))
    for index in range(0,len(previous_day)):
        if previous_day[index] == "-":
            previous_day[index] = "_"
    previous_day = "".join(previous_day)

​ All file name in cga folder are saved in sorted array "temp_files". [16:26] substring of each cga file name from temp_files gives year_month_day which is compared with "previous_date", if they are same that means given file is previous day file and it is appended to another array "cga_files". ​ Then "temp_files" array is used to store all sorted files name present in salzburg folder. Salzburg file names are in format year_month_day_time but single digit months are saved without a leading 0 (example: 2020_9_11_00.csv.gz). This edge case is checked by comparing 6th index, If it is "_"(underscore) it means month is single digit else it is double digit. For single digit case substring with zero ([0:5] + "0" + [5:9]) is compared with previous date else for double digit [0:10] substring is compared with previous date, if they match then the current file is added to another array "salzburg_files". ​

3. Adding Data

​ Now that we have previous day files in cga_files and salzburg_files array we need to merge files with same time. Cga and Salzburg file names have time [0,23] (inclusive). We use a for loop with variable "time" from [0,23] (inclusive) which is then compared with the time of current cga file and current salzburg file. We use "index_cga" and "index_salzburg" as pointers to keep track of current cga and current salzburg file. Then there can be 5 cases: ​

a. Both current Cga and Salzburg file have time equal to time variable:

In this case we merge both the files and increment both the "index_cga" and "index_salzburg" variables by 1. ​

b. Only current Cga file time is equal to time variable:

In this case we take only the cga file and increment only the index_cga by 1. ​

c. Only current Salzburg file time is equal to time variable:

In this case we take only the Salzburg file and increment only the index_salzburg by 1. ​

d. None of the current cga or salzburg time is equal to time variable:

In this case we do not take any file and move to next loop time iteration. ​

e. We have added all the files:

Whenever we reach this condition we break out of the for loop immediately. ​

int(cga_files[index_cga][-6:-4]) # gives current cga file time
int(salzburg_files[index_salzburg][10:12]) # gives current salzburg file time

​ To add cga files we created the function add_to_cga_table which first create a temporary table and push all the records from current cga csv into it. Then it takes records from the temporary table and add it into cga_table with taking duplicate records only once. At the end of the query the temporary table is then dropped. ​ To add salzburg files we use pandas and sqlalchemy library. add_to_sbg_table function is created to add files into salzburg. Firstly the current csv is read records are added to a data frame using data_frame = pd.read_csv(file, sep='\t', low_memory=False, error_bad_lines=False, lineterminator='\n', compression='gzip'). Now some columns had capital header whereas Postgres automatically converts all headers to lowercase so all the header required to be lowercase and was made lowercase using data_frame.columns = map(str.lower, data_frame.columns). Now data frame records were added to table using data_frame.to_sql('salzburg_table', alchemy_engine, if_exists='append', chunksize=10000, index=False). Here we specify chunk-size so that instead of adding all data at once data is added in chunks. ​

4. Merge and Export:

​ merge function is responsible for merging. While merging we take Salzburg data as the primary source of data meaning if a record is present in both cga and salzburg we will take salzburg data. To achieve that we firstly insert data from salzburg_table into merged_table and then we insert data from cga_table into merged_table taking only non added records, if a record repeats we skip it by query ON CONFLICT DO NOTHING. Here we also created a custom psql function as_epoch which is used to convert given date from format TIMESTAMP to BIGINT while inserting records from both tables to merged_table. ​ export function is used to export merged data from merged_table into csv in an output folder. If we had added both cga and salzburg file or just the cga file we will take name from cga file using substring cga_file[16:-4] (giving something like "2020_10_08_00"). If we had added only the salzburg file we will take name from salzburg file using substring Salzburg_file[:-7] (giving something like "2020_10_08_00"). These file name are passed into the export function from main function, here's the corresponding code:

        if not cga_added and not salzberg_added:
            continue
        elif cga_added:
            file_name = cga_files[index_cga-1][16:-4]
        else:
            file_name = salzburg_files[index_salzburg-1][:-7]

5. Add to final db and Clean:

​ In this step the data from merged_table is added to final_db and the merged_table, cga_table and salzburg_table are truncated because we won't be needing their data again. And then we iterate over to the next loop and repeat the process 3-5. ​

Procedure Used For ETL:

​ Most of the procedure is same as cron job only selecting files and adding file logic is changed. Instead of selecting previous day files we select all the files from cga and salzburg and store them in sorted arrays "cga_files" and "salzburg_files" respectively. Now there can be 5 cases: ​

a. current cga file and current salzburg file are of same time frame (meaning same year_month_day_time)

In this case we will use both of these files and increment index_salzburg and index_cga by 1 to move onto next file. ​

b. current cga is earlier to current salzburg

In this case only current cga is used because since files are sorted and salzburg has time frame ahead of cga which opens up possibility that after few iterations we might see time frame of a cga file equal to that of salzburg file. So we take the current cga and increment index_cga by 1. ​

c. current salzburg is earlier to current cga

It is opposite to above case, here we take current salzburg file and increment index_salzburg by 1. ​

d. all salzburg files are seen

In this case we take only cga file and increment index_cga by 1. ​

e. all cga files are seen

In this case we take only salzburg file and increment index_salzburg by 1. ​ This process in worst case will run in O(M+N) complexity where M, N is number of cga files and salzburg files respectively which will occur if cga files and salzburg files are disjoint sets. At every iteration it is checked if all files are read in both cga and salzburg, if that happens we break out of the loop immediately. ​

cga_files[index_cga][16:-4] # gives year_month_day_time for cga file
salzburg_files[index_salzburg][0:-7] # gives year_month_day_time for salzburg file
time.strptime(cga_files[index_cga][16:-4],"%Y_%m_%d_%H") # gives struct_time object with year, month, day and hour 
Clone this wiki locally