Boost up HiveQL with Oozie to Spark SQL migration on Amazon EMR

Many purchasers run large information workloads similar to extract, grow to be, and cargo (ETL) on Apache Hive to create an information warehouse on Hadoop. Apache Hive has carried out lovely neatly for a very long time. However with developments in infrastructure similar to cloud computing and multicore machines with huge RAM, Apache Spark began to achieve visibility through acting higher than Apache Hive.

Shoppers now wish to migrate their Apache Hive workloads to Apache Spark within the cloud to get the advantages of optimized runtime, price relief thru temporary clusters, higher scalability through decoupling the garage and compute, and versatility. Alternatively, migration from Apache Hive to Apache Spark wishes a large number of guide effort to put in writing migration scripts and take care of other Spark activity configurations.

On this put up, we stroll you thru an answer that automates the migration from HiveQL to Spark SQL. The answer was once used emigrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a big gaming consumer. You’ll additionally use this way to expand new jobs with Spark SQL and procedure them on Amazon EMR. This put up assumes that you’ve a elementary figuring out of Apache Spark, Hive, and Amazon EMR.

Resolution review

In our instance, we use Apache Oozie, which schedules Apache Hive jobs as movements to gather and procedure information each day.

We migrate those Oozie workflows with Hive movements through extracting the HQL recordsdata, and dynamic and static parameters, and changing them to be Spark compliant. Handbook conversion is each time eating and mistake vulnerable. To transform the HQL to Spark SQL, you’ll wish to kind thru present HQLs, exchange the parameters, and alter the syntax for a host of recordsdata.

As a substitute, we will be able to use automation to hurry up the method of migration and scale back heavy lifting duties, prices, and dangers.

We break up the answer into two number one parts: producing Spark activity metadata and working the SQL on Amazon EMR. The primary element (metadata setup) consumes present Hive activity configurations and generates metadata similar to selection of parameters, selection of movements (steps), and dossier codecs. The second one element consumes the generated metadata from the primary element and prepares the run order of Spark SQL inside a Spark consultation. With this answer, we beef up elementary orchestration and scheduling with the assistance of AWS services and products like Amazon DynamoDB and Amazon Easy Garage Carrier (Amazon S3). We will validate the answer through working queries in Amazon Athena.

Within the following sections, we stroll thru those parts and the best way to use those automations intimately.

Generate Spark SQL metadata

Our batch activity is composed of Hive steps scheduled to run sequentially. For each and every step, we run HQL scripts that extract, grow to be, and mixture enter information into one ultimate Hive desk, which retail outlets information in HDFS. We use the next Oozie workflow parser script, which takes the enter of an present Hive activity and generates configurations artifacts wanted for working SQL the usage of PySpark.

Oozie workflow XML parser

We create a Python script to routinely parse the Oozie jobs, together with workflow.xml, co-ordinator.xml, activity houses, and HQL recordsdata. This script can deal with many Hive movements in a workflow through organizing the metadata on the step stage into separate folders. Every step comprises the checklist of SQLs, SQL paths, and their static parameters, that are enter for the answer in your next step.

The method is composed of 2 steps:

  1. The Python parser script takes enter of the present Oozie Hive activity and its configuration recordsdata.
  2. The script generates a metadata JSON dossier for each and every step.

The next diagram outlines those steps and presentations pattern output.

Must haves

You want the next must haves:

  • Python 3.8
  • Python applications:
    • sqlparse==0.4.2
    • jproperties==2.1.1
    • defusedxml== 0.7.1

Setup

Entire the next steps:

  1. Set up Python 3.8.
  2. Create a digital atmosphere:
python3 -m venv /trail/to/new/digital/atmosphere

  1. Turn on the newly created digital atmosphere:
supply /trail/to/new/digital/atmosphere/bin/turn on

  1. Git clone the undertaking:
git clone https://github.com/aws-samples/oozie-job-parser-extract-hive-sql

  1. Set up dependent applications:
cd oozie-job-parser-extract-hive-sql
pip set up -r necessities.txt

Pattern command

We will use the next pattern command:

python xml_parser.py --base-folder ./sample_jobs/ --job-name sample_oozie_job_name --job-version V3 --hive-action-version 0.4 --coordinator-action-version 0.4 --workflow-version 0.4 --properties-file-name activity.coordinator.houses

The output is as follows:

{'nameNode': 'hdfs://@{{/cluster/${{cluster}}/namenode}}:54310', 'jobTracker': '@{{/cluster/${{cluster}}/jobtracker}}:54311', 'queueName': 'test_queue', 'appName': 'test_app', 'oozie.use.device.libpath': 'true', 'oozie.coord.software.trail': '${nameNode}/consumer/${consumer.call}/apps/${appName}', 'oozie_app_path': '${oozie.coord.software.trail}', 'get started': '${{startDate}}', 'finish': '${{endDate}}', 'initial_instance': '${{startDate}}', 'job_name': '${appName}', 'timeOut': '-1', 'concurrency': '3', 'execOrder': 'FIFO', 'throttle': '7', 'hiveMetaThrift': '@{{/cluster/${{cluster}}/hivemetastore}}', 'hiveMySQL': '@{{/cluster/${{cluster}}/hivemysql}}', 'zkQuorum': '@{{/cluster/${{cluster}}/zookeeper}}', 'flag': '_done', 'frequency': 'hourly', 'proprietor': 'who', 'SLA': '2:00', 'job_type': 'coordinator', 'sys_cat_id': '6', 'lively': '1', 'data_file': 'hdfs://${nameNode}/hive/warehouse/test_schema/test_dataset', 'upstreamTriggerDir': '/input_trigger/upstream1'}

('./sample_jobs/building/sample_oozie_job_name/step1/step1.json', 'w')

('./sample_jobs/building/sample_oozie_job_name/step2/step2.json', 'w')

Barriers

This system has the next boundaries:

  • The Python script parses simplest HiveQL movements from the Oozie workflow.xml.
  • The Python script generates one dossier for each and every SQL commentary and makes use of the collection ID for dossier names. It doesn’t call the SQL according to the capability of the SQL.

Run Spark SQL on Amazon EMR

Once we create the SQL metadata recordsdata, we use any other automation script to run them with Spark SQL on Amazon EMR. This automation script helps customized UDFs through including JAR recordsdata to spark post. This answer makes use of DynamoDB for logging the run main points of SQLs for beef up and upkeep.

Structure review

The next diagram illustrates the answer structure.

Must haves

You want the next must haves:

  • Model:
    • Spark 3.X
    • Python 3.8
    • Amazon EMR 6.1

Setup

Entire the next steps:

  1. Set up the AWS Command Line Interface (AWS CLI) for your workspace through following the directions in Putting in or updating the newest edition of the AWS CLI. To configure AWS CLI interplay with AWS, check with Fast setup.
  2. Create two tables in DynamoDB: one to retailer metadata about jobs and steps, and any other to log activity runs.
    • Use the next AWS CLI command to create the metadata desk in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-metadata --attribute-definitions '[ { "AttributeName": "id","AttributeType": "S" } , { "AttributeName": "step_id","AttributeType": "S" }]' --key-schema '[{"AttributeName": "id", "KeyType": "HASH"}, {"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You’ll take a look at at the DynamoDB console that the desk dw-etl-metadata is effectively created.

The metadata desk has the next attributes.

Attributes Kind Feedback
identity String partition_key
step_id String sort_key
step_name String Step description
sql_base_path string Base trail
sql_info checklist Checklist of SQLs in ETL pipeline
. sql_path SQL dossier call
. sql_active_flag y/n
. sql_load_order Order of SQL
. sql_parameters Parameters in SQL and values
spark_config Map Spark configs
    • Use the next AWS CLI command to create the log desk in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-pipelinelog --attribute-definitions '[ { "AttributeName":"job_run_id", "AttributeType": "S" } , { "AttributeName":"step_id", "AttributeType": "S" } ]' --key-schema '[{"AttributeName": "job_run_id", "KeyType": "HASH"},{"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You’ll take a look at at the DynamoDB console that the desk dw-etl-pipelinelog is effectively created.

The log desk has the next attributes.

Attributes Kind Feedback
job_run_id String partition_key
identity String sort_key (UUID)
end_time String Finish time
error_description String Error in case of failure
expire Quantity Time to Are living
sql_seq Quantity SQL collection quantity
start_time String Get started time
Standing String Standing of activity
step_id String Task ID SQL belongs

The log desk can develop temporarily if there are too many roles or if they’re working regularly. We will archive them to Amazon S3 if they’re now not used or use the Time to Are living characteristic of DynamoDB to scrub up previous data.

  1. Run the primary command to set the variable if in case you have an present bucket that may be reused. If no longer, create a S3 bucket to retailer the Spark SQL code, which can be run through Amazon EMR.
export s3_bucket_name=unique-code-bucket-name # Trade unique-code-bucket-name to a legitimate bucket call

aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Permit protected switch at the bucket:
aws s3api put-bucket-policy --bucket $s3_bucket_name --policy '{"Model": "2012-10-17", "Commentary": [{"Effect": "Deny", "Principal": {"AWS": "*"}, "Action": "s3:*", "Resource": ["arn:aws:s3:::unique-code-bucket-name", "arn:aws:s3:::unique-code-bucket-name/*"], "Situation": {"Bool": {"aws:SecureTransport": "false"} } } ] }' # Trade unique-code-bucket-name to a legitimate bucket call

  1. Clone the undertaking in your workspace:
git clone https://github.com/aws-samples/pyspark-sql-framework.git

  1. Create a ZIP dossier and add it to the code bucket created previous:
cd pyspark-sql-framework/code
zip code.zip -r *
aws s3 cp ./code.zip s3://$s3_bucket_name/framework/code.zip

  1. Add the ETL motive force code to the S3 bucket:
cd $OLDPWD/pyspark-sql-framework
aws s3 cp ./code/etl_driver.py s3://$s3_bucket_name/framework/

  1. Add pattern activity SQLs to Amazon S3:
aws s3 cp ./sample_oozie_job_name/ s3://$s3_bucket_name/DW/sample_oozie_job_name/ --recursive

  1. Upload a pattern step (./sample_oozie_job_name/step1/step1.json) to DynamoDB (for more info, check with Write information to a desk the usage of the console or AWS CLI):
{
  "call": "step1.q",
  "step_name": "step1",
  "sql_info": [
    {
      "sql_load_order": 5,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "5.sql"
    },
    {
      "sql_load_order": 10,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "10.sql"
    }
  ],
  "identity": "emr_config",
  "step_id": "sample_oozie_job_name#step1",
  "sql_base_path": "sample_oozie_job_name/step1/",
  "spark_config": {
    "spark.sql.parser.quotedRegexColumnNames": "true"
  }
}

  1. Within the Athena question editor, create the database base:
  1. Reproduction the pattern information recordsdata from the repo to Amazon S3:
    1. Reproduction us_current.csv:
aws s3 cp ./sample_data/us_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_us_current/;

  1. Reproduction states_current.csv:
aws s3 cp ./sample_data/states_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_states_current/;

  1. To create the supply tables within the base database, run the DDLs provide within the repo within the Athena question editor:
    1. Run the ./sample_data/ddl/states_current.q dossier through editing the S3 trail to the bucket you created.
    1. Run the ./sample_data/ddl/us_current.q dossier through editing the S3 trail to the bucket you created.

The ETL motive force dossier implements the Spark motive force good judgment. It may be invoked in the neighborhood or on an EMR example.

  1. Release an EMR cluster.
    1. Ensure to choose Use for Spark desk metadata below AWS Glue Information Catalog settings.

  1. Upload the next steps to EMR cluster.
aws emr add-steps --cluster-id <<cluster identity created above>> --steps 'Kind=CUSTOM_JAR,Identify="boto3",ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[bash,-c,"sudo pip3 install boto3"]'
aws emr add-steps --cluster-id <<cluster identity created above>> --steps 'Identify="sample_oozie_job_name",Jar="command-runner.jar",Args=[spark-submit,--py-files,s3://unique-code-bucket-name-#####/framework/code.zip,s3://unique-code-bucket-name-#####/framework/etl_driver.py,--step_id,sample_oozie_job_name#step1,--job_run_id,sample_oozie_job_name#step1#2022-01-01-12-00-01,  --code_bucket=s3://unique-code-bucket-name-#####/DW,--metadata_table=dw-etl-metadata,--log_table_name=dw-etl-pipelinelog,--sql_parameters,DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#####]' # Trade unique-code-bucket-name to a legitimate bucket call

The next desk summarizes the parameters for the spark step.

Step sort Spark Software
Identify Any Identify
Deploy mode Shopper
Spark-submit choices --py-files s3://unique-code-bucket-name-#####/framework/code.zip
Software location s3://unique-code-bucket-name-####/framework/etl_driver.py
Arguments --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#######
Motion on failure Proceed

The next desk summarizes the script arguments.

Script Argument Argument Description
deploy-mode Spark deploy mode. Shopper/Cluster.
call <jobname>#<stepname> Distinctive call for the Spark activity. This can be utilized to spot the activity at the Spark Historical past UI.
py-files <s3 trail for code>/code.zip S3 trail for the code.
<s3 trail for code>/etl_driver.py S3 trail for the motive force module. That is the access level for the answer.
step_id <jobname>#<stepname> Distinctive call for the step. This refers back to the step_id within the metadata entered in DynamoDB.
job_run_id <random UUID> Distinctive ID to spot the log entries in DynamoDB.
log_table_name <DynamoDB Log desk call> DynamoDB desk for storing the activity run main points.
code_bucket <s3 bucket> S3 trail for the SQL recordsdata which can be uploaded within the activity setup.
metadata_table <DynamoDB Metadata desk call> DynamoDB desk for storing the activity metadata.
sql_parameters DATE=2022-07-04::HOUR=00 Any further or dynamic parameters anticipated through the SQL recordsdata.

Validation

After of entirety of EMR step you’ll have information on S3 bucket for the desk base.states_daily. We will validate the information through querying the desk base.states_daily in Athena.

Congratulations, you’ve transformed an Oozie Hive activity to Spark and run on Amazon EMR effectively.

Resolution highlights

This answer has the next advantages:

  • It avoids boilerplate code for any new pipeline and gives much less upkeep of code
  • Onboarding any new pipeline simplest wishes the metadata arrange—the DynamoDB entries and SQL to be positioned within the S3 trail
  • Any commonplace changes or improvements may also be performed on the answer stage, which can be mirrored throughout all jobs
  • DynamoDB metadata supplies perception into all lively jobs and their optimized runtime parameters
  • For each and every run, this answer persists the SQL get started time, finish time, and standing in a log desk to spot problems and analyze runtimes
  • It helps Spark SQL and UDF capability. Customized UDFs may also be supplied externally to the spark post command

Barriers

This system has the next boundaries:

  • The answer simplest helps SQL queries on Spark
  • Every SQL will have to be a Spark motion like insert, create, drop, and so forth

On this put up, we defined the state of affairs of migrating an present Oozie activity. We will use the PySpark answer independently for any new building through developing DynamoDB entries and SQL recordsdata.

Blank up

Delete the entire sources created as a part of this way to steer clear of ongoing fees for the sources:

  1. Delete the DynamoDB tables:
aws dynamodb delete-table --table-name dw-etl-metadata --region us-east-1
aws dynamodb delete-table --table-name dw-etl-pipelinelog --region us-east-1

  1. Delete the S3 bucket:
aws s3 rm s3://$s3_bucket_name --region us-east-1 --recursive
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Prevent the EMR cluster if it wasn’t a temporary cluster:
aws emr terminate-clusters --cluster-ids <<cluster identity created above>> 

Conclusion

On this put up, we introduced two computerized answers: one for parsing Oozie workflows and HiveQL recordsdata to generate metadata, and a PySpark answer for working SQLs the usage of generated metadata. We effectively carried out those answers emigrate a Hive workload to EMR Spark for a significant gaming buyer and accomplished about 60% effort relief.

For a Hive with Oozie to Spark migration, those answers lend a hand whole the code conversion temporarily so you’ll be able to focal point on efficiency benchmark and trying out. Creating a brand new pipeline may be fast—you simplest wish to create SQL good judgment, check it the usage of Spark (shell or pocket book), upload metadata to DynamoDB, and check by way of the PySpark SQL answer. Total, you’ll be able to use the answer on this put up to boost up Hive to Spark code migration.


Concerning the authors

Vinay Kumar Khambhampati is a Lead Marketing consultant with the AWS ProServe Workforce, serving to consumers with cloud adoption. He’s large information and knowledge analytics.

Sandeep Singh is a Lead Marketing consultant at AWS ProServe, desirous about analytics, information lake structure, and implementation. He is helping undertaking consumers migrate and modernize their information lake and knowledge warehouse the usage of AWS services and products.

Amol Guldagad is a Information Analytics Marketing consultant primarily based in India. He has labored with consumers in several industries like banking and fiscal services and products, healthcare, energy and utilities, production, and retail, serving to them clear up complicated demanding situations with large-scale information platforms. At AWS ProServe, he is helping consumers boost up their adventure to the cloud and innovate the usage of AWS analytics services and products.

Like this post? Please share to your friends:
Leave a Reply

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: