Google Analytics is really useful but sometimes you want retrive all the data and perform your own analysis and create your reports. In this post you will see how to do that with python.
The first thing to do is to download the Google key. To do so you only need to follow Google Analytics API steps.
To sum them up you will need to:
json
private keyYou will need to enable the usage of the Google Analytics API with:
From the previous step you created and account with an email similar to your_project@your_web.iam.gserviceaccount.com
(you obviously need to change your_web
and your_project
). Then give it read permisions with:
Manage Users
at account level. It is important that you give it at the account level in order to work.It can take some time until the new user has reading permisions.
Get the ID that you will need to use with the API. There are three levels and the ID that you need to use is in the last one, the one with the red mark in the image.
You can run the test_conexion.py
code to check that everything is working.
from datetime import date, timedelta from apiclient.discovery import build from oauth2client.service_account import ServiceAccountCredentials # Those are the two constants you need to change FILE_GA_KEY = "" path tho the key file PROFILE = "" profile ID def get_ga_service(): """ Connect to GA API service""" credentials = ServiceAccountCredentials.from_json_keyfile_name( FILE_GA_KEY, scopes=["https://www.googleapis.com/auth/analytics.readonly"] ) # Build the service object. return build("analytics", "v3", credentials=credentials) def test(): """ Tests the conexion """ service = get_ga_service() # Feel free to change those values end = date.today() dimensions = ["ga:date", "ga:deviceCategory"] metrics = ["ga:users", "ga:sessions"] start = end - timedelta(7) # Query the API kwa = { "ids": "ga:{}".format(PROFILE), "start_date": "{:%Y-%m-%d}".format(start), "end_date": "{:%Y-%m-%d}".format(end), "metrics": ",".join(metrics), "dimensions": ",".join(dimensions), "max_results": 20, } return service.data().ga().get(**kwa).execute()
Remeber to edit both
FILE_GA_KEY
andPROFILE
constants using the values from the previous steps.
Now is the time to decide what you want to download. There are two types of parameters metrics and dimensions.
You can check with query explorer the possible values for both of them and you can also try your queries.
In this step I will descrive functions in the etl.py
file.
The first thing to do is to create the Google Analytics service with:
def get_ga_service(key_file_location=None): """ Connect to GA API service""" if key_file_location is None: key_file_location = c.FILE_GA_KEY scope = "https://www.googleapis.com/auth/analytics.readonly" credentials = ServiceAccountCredentials.from_json_keyfile_name( key_file_location, scopes=[scope] ) # Build the service object. return build("analytics", "v3", credentials=credentials)
This function takes the default path of the Google Analytics key from the constants while allowing to pass a custom one.
Then to retrive the actual data you should use get_da_df
function.
def get_ga_df(query_data, end=date.today(), service=None): """ Retrive GA data as dataframe """ if service is None: service = get_ga_service() dimensions = query_data["dimensions"].keys() metrics = query_data["metrics"].keys() start = end - timedelta(c.TIMEWINDOW) # Query the API kwa = { "ids": "ga:{}".format(c.PROFILE), "start_date": "{:%Y-%m-%d}".format(start), "end_date": "{:%Y-%m-%d}".format(end), "metrics": ",".join(metrics), "dimensions": ",".join(dimensions), "max_results": c.MAX_RESULTS, } data = service.data().ga().get(**kwa).execute() # Create df from data obtained through the API columns = [x["name"] for x in data["columnHeaders"]] df = pd.DataFrame(data["rows"], columns=columns) # Handle missing values for x in dimensions: df[x] = df[x].replace("(not set)", float("nan")) # handle missing campaigns if ("ga:adwordsCampaignID" in df.columns) and ("ga:campaign" in df.columns): df.dropna(subset=["ga:adwordsCampaignID", "ga:campaign"], inplace=True) # Rename columns rename_dict = {i: x[0] for i, x in query_data["dimensions"].items()} rename_dict.update({i: x[0] for i, x in query_data["metrics"].items()}) df = df.rename(index=str, columns=rename_dict) # Transform types for x in ["dimensions", "metrics"]: df = u.fix_types(df, query_data[x].values()) log.info("Data read") return df
This functions retrive a pandas dataframe using the requested parameters.
After creating the raw dataframe it renames the columns using the dictionary QUERY_DATA
from the constants.py
file.
It will also change the columns types using the same dictionary.
So for exemple having:
QUERY_DATA = { "traffic_google": { "dimensions": {"ga:date": ("date", "date"), "ga:deviceCategory": ("device", str)}, "metrics": { "ga:users": ("users", int), "ga:sessions": ("sessions", int), "ga:bounceRate": ("bounce_rate", float), }, }, }
Will retrive the ga:deviceCategory
dimension rename the column to device
and store it as a string.
Finally to run the ETL for all the tables in `QUERY_DATA
there is the function do_etl
:
def do_etl(): """ Reads from GA, transforms the data and loads into mysql """ time0 = time() for tablename, data in c.QUERY_DATA.items(): # Retrive data from GA API df = get_ga_df(data) # Insert data into mysql insert_into_mysql(df, tablename) log.info("Data imported", time=time() - time0)
This is what you should run every day. By default it will take the data from the last 7 days. This redundancy will mean that even if the ETL fail some day it can automatically retrive the missing data the next day. In order to lose that it must fail 7 consecutive days.
And the last function is for retriving all old data. This is meant to only be used the first time.
def import_old_stats(end=date.today()): """ Imports old data up until 2017-01-01 """ # Load from 2017-01-01 while end > date(2017, 1, 1): time0 = time() start = end - timedelta(c.TIMEWINDOW) log.info(f"Importing data from {start:%Y-%m-%d} to {end:%Y-%m-%d}") for tablename, data in c.QUERY_DATA.items(): # Retrive data from GA API df = get_ga_df(data, end=end) # Insert data into mysql insert_into_mysql(df, tablename) end -= timedelta(int(c.TIMEWINDOW) + 1) log.info("Data imported", time=time() - time0)
Right now it will download all data from 2017 until today.
In this example amb going to show how to insert the data into MySQL.
All this code is from sql.py
file.
The idea is that the ETL can be run more than once without creating duplicated data. This is also important since the ETL is designed with redundancy to allow some failures so it is needed to handle duplicates.
There are two aproaches to this problem:
Every time you download data you will have a dataframe with info of the last 7 days. In order to avoid duplicates you can simply delete all rows that have data within the same days.
The other aproach is to load all existing data and concatenate with the new one.
This dataframe will have duplicates but they can be deleted with the drop_duplicates
pandas function.
To do so you need to specify which are the columns that define a unique entry.
This solution is less efficient since it will need to load all existing data and this can be time consuming.
The first thing to do is to create the SQL Alchemy engine for MySQL. For other SQL engines check the SQL Alchemy post.
def _get_mysql_engine(): """ Get MySQL SQLAlchemy engine """ return sa.create_engine( sa.engine.url.URL( drivername="mysql+pymysql", username="username", # Change that!! password="password", # Change that!! host="host", # Change that!! port=c.PORT, database=c.DATABASE, ), encoding="utf-8", # Since there will be some japanse chars )
Using the create_engine
and engine.url.URL
functions from SQLAlchemy is the most elegant way to create the engine.
To retrive a table you can use:
def get_df_mysql(tablename, columns="*", engine=None): """ Retrives one table from MySQL """ if engine is None: engine = _get_mysql_engine() query = "SELECT {} FROM {}".format(columns, tablename) with engine.connect() as connection: return pd.read_sql_query(query, connection)
And finally the code to insert the data handling the duplicates:
def insert_into_mysql(df, tablename, cols_id=None, engine=None): """ Insert dataframe into mysql. If date column is present it will delete all rows that are in the same date range as df to be inserted. Else it will truncate the sql table. Args: df: dataframe to insert tablename: name of the table cols_id: allow dropping duplicates using those columns insted of deleting all engine: sql_alchemy engine """ if engine is None: engine = _get_mysql_engine() # If date is present, delete rows of same date if "date" in df.columns: # Transform date to string to avoid SQL problems df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") # Get all dates present in df to insert dates = df["date"].unique() # Delete data of same dates as the data that will be inserted sentence = c.DELETE.format(table=tablename, dates="','".join(dates)) with engine.connect() as connection: connection.execute(sentence) # Truncate all data or keep non duplicated else: # Try to merge with existing data if cols_id is not None: # retrive existing data df_prev = get_df_mysql(tablename, engine=engine) # Drop duplicated data df = pd.concat([df, df_prev], sort=False) df.drop_duplicates(subset=cols_id, inplace=True) with engine.connect() as connection: connection.execute(c.TRUNCATE.format(tablename)) # Insert into SQL df.to_sql(name=tablename, con=engine, if_exists="append", index=False)
This code will check if the date
column is present.
If it is it will handle duplicates using it, if not it will use the other aproach.
And with all that you can create an ETL to load data from Google Analytics.