-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ov 404 simplify upgrade process v1 #681
base: 0730_release
Are you sure you want to change the base?
Ov 404 simplify upgrade process v1 #681
Conversation
SonarCloud Quality Gate failed. 0 Bugs No Coverage information |
|
||
|
||
override def upgrade(): DataFrame = { | ||
val tempDir = s"/tmp/overwatch/upgrade0610_status__ctrl_0x111/${System.currentTimeMillis()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this prefix should taken from prodWorkspace.getConfig.tempWorkingDir
by default but be able to be optionally overridden in the constructor. Please default it to null string so user doesn't have to pass it in.
val etlDatabaseName = prodWorkspace.database.getDatabaseName | ||
dbutils.fs.mkdirs(tempDir) // init tempDir -- if no errors it wouldn't be created | ||
val blankConfig = new Config() | ||
val currentSchemaVersion = SchemaTools.getSchemaVersion(etlDatabaseName) | ||
val numericalSchemaVersion = getNumericalSchemaVersion(currentSchemaVersion) | ||
val targetSchemaVersion = "0.610" | ||
validateSchemaUpgradeEligibility(currentSchemaVersion, targetSchemaVersion) | ||
validateNumericalSchemaVersion(numericalSchemaVersion, 600, 610) | ||
val upgradeStatus: ArrayBuffer[UpgradeReport] = ArrayBuffer() | ||
val dbrVersion = spark.conf.get("spark.databricks.clusterUsageTags.effectiveSparkVersion") | ||
val dbrMajorV = dbrVersion.split("\\.").head | ||
val dbrMinorV = dbrVersion.split("\\.")(1) | ||
val dbrVersionNumerical = s"$dbrMajorV.$dbrMinorV".toDouble | ||
val initialSourceVersions: concurrent.Map[String, Long] = new ConcurrentHashMap[String, Long]().asScala | ||
val packageVersion = blankConfig.getClass.getPackage.getImplementationVersion.replaceAll("\\.", "").tail.toInt | ||
val startingSchemaVersion = SchemaTools.getSchemaVersion(etlDatabaseName).split("\\.").takeRight(1).head.toInt | ||
validateSchemaAndPackageVersion(startingSchemaVersion, packageVersion, 600, 610) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these items are common across all upgrades such as source and target version etc. The only exception is the dbr version stuff for the 0610 upgrade. All of the vals that are derived, can they be moved to the abstract class so they don't have to be defined every time?
(startStep to endStep).foreach { | ||
case 1 => | ||
val stepMsg = Some("Step 1: Upgrade Schema - Job Status Silver") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to try and clean this up so that all upgrades ultimately submit a map of step number and upgradeStepFunction to the UpgradeHandler. Hopefully, it can look something like
private def 0610Step1(...) = ???
private def 0610Step2(...) = ???
val upgradeMap(stepNumber -> upgradeFunc, ...)
upgrade(upgradeMap)
something like this. The above is just pseudo code obviously but hopefully that is clear enough. The idea being that the dev creates n functions and then passes those to the upgrade -- I think this will make the code much cleaner and easier to navigate, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inside the upgrade handler all the exception handlers should be obfuscated there meaning all the try / catch / finally stuff should be in the abstract class or executor not in the actual upgrade script.
checkIfTargetExists(etlDatabaseName, targetName) | ||
initialSourceVersions.put(targetName, Helpers.getLatestTableVersionByName(s"${etlDatabaseName}.${targetName}")) | ||
val jobSilverDF = spark.table(s"${etlDatabaseName}.${targetName}") | ||
removeNestedColumnsAndSaveAsTable(jobSilverDF,"new_settings", Array("tasks", "job_clusters"),etlDatabaseName,targetName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the entirety of the step 1 upgrade func for 0610 upgrade. This hopefully will be all the code necessary to complete the 0610 step 1 upgrade as a dev building the upgrade.
|
||
class UpgradeExecutor(val etldbname: String, val fromVersion: Int, val toVersion: Int, val executeSingleStep: Int = -1) { | ||
|
||
val versionMap = Map("600-609" -> classOf[UpgradeTo0610], "610-699" -> classOf[UpgradeTo0700]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you have a versionMap for each upgrade path? Is this just an example for 1 of them?
* initial commit * Changing Serverless to High-Concurrency (#706) * Changing Serverless to High-Concurrency * minor changes --------- Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]> * Spark events bronze par session (#678) * initial commit * Test Spark per session * notes from meeting * persession implemented * persession implemented * persession implemented * persession implemented * persession implemented * persession implemented * pr review implemented * 686 - SparkEvents Executor ID Schema Handler * global Session - initialize configs * Concurrent Writes - Table Lock for Parallelized Loads (#691) * initi commit -- working * code modified preWriteActions added * re-instanted perform retry for legacy deployments and added comments * added logging details * clear sessionsMap on batch runner --------- Co-authored-by: sriram251-code <[email protected]> * minor fixes --------- Co-authored-by: Daniel Tomes <[email protected]> Co-authored-by: geeksheikh <[email protected]> * adding new snapshots to bronze layer (#684) * adding new snapshots to bronze layer * changed all the single asset name to plural * adding transform function for new bronze snaps * changes applied to improve schema quality --------- Co-authored-by: Daniel Tomes <[email protected]> * apiURL "/" removed and dbsql added (#699) * bug fix * bug fix * bug fix * bug fix * Improve acquisition of Cloud Provider and OrgID (#708) * Improve acquisition of Cloud Provider and OrgID * Improve acquisition of Cloud Provider and OrgID * Modularize getOrgID function * removed old commented version of code --------- Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]> * Update SilverTransforms.scala (#703) * Overwatch on photon broadcast exchange perf issue (#705) * Change the Join to Shuffle_hash Join for collectEventLogPaths * Change the Join to Shuffle_hash Join for collectEventLogPaths * adding pagination logic for job-runs api (#723) * 729 - enable clusterEvents merge Insert (#730) * enable clusterEvents merge Insert * added comments --------- Co-authored-by: geeksheikh <[email protected]> * auditRaw - mergeInsertOnly (#738) * enable clusterEvents merge Insert * added comments * 737 - dateGlobFix and auditLogRaw mergeInserts --------- Co-authored-by: geeksheikh <[email protected]> * implemented (#752) * asStrings implemented for apicallv2 (#707) * 749 fill meta improved (#753) * 749 fill meta improved * put tsPartVal in clsf back to 16 --------- Co-authored-by: geeksheikh <[email protected]> * cleaning jobs_snap_bronze new_cluster field (#732) Co-authored-by: geeksheikh <[email protected]> * dbu and cost calculations fixed (#760) * dbu calcs corrected * readd aliases * add runtime_engine to fillforward * added a few comments to funcs * corrected workerDBU Cost Value * enabled remote getWorkspaceByDatabase (#754) Co-authored-by: geeksheikh <[email protected]> * improved first run Impute clusterSpec (#759) * excluded scope enhanced (#740) * excluded scope enhanced * review comment implemented * modified lowerCase logic --------- Co-authored-by: Daniel Tomes <[email protected]> * adding temp location, start and end time in jobs runs list api (#755) * adding temp location, start and end time in jobs runs list api * change in jobRunsList api call * removed default apiVersion from new apply method * adding fix for jobs runs list api * adding code to cleanse duplicate cols in JobRunsList transform, and added new bronze snapshots in target * reading mount source from csv implemented (#695) * reading mount source from csv implemented * driver workspace should not call search/mount to get source * review comment implemented * review comment implemented * Reading config from delta implemented. (#713) * reading config from delta implemented. skip mount point check for AWS added. * review comment implemented * review comment implemented * review comment implemented * review comment implemented * review comment implemented * merge conflict removed * shuffle partition changed to String (#717) * shuffle partition changed to String * comments implemented * comments implemented * comments implemented * comments implemented * comments implemented * comments implemented * comments implemented * test cases added * test cases added * adding generic api calls function (#756) * adding generic api calls function * adding an empty map as return in APIMeta Trait for def getAPIJsonQuery * adding function getParallelAPIParams * implemented code review comments * removed commented lines * one workspace instance per workspace deployment (#774) * adding cluster type in jrcp view (#778) * improved spark conf handler and optimized confs (#773) * mount mapping validation added (#777) * mount mapping validation added * review comments implemented * review comments implemented * review comments implemented * review comments implemented * Integration Testing - Bug Fixes (#782) * added persistAndLoad to all writes with tableLocking * dont perform data validation if path validation fails -- protrects first run failures especially * fix excludedScopes * null config handlers plus proxy scope,key error handler * added persistAndLoad to all writes with tableLocking * dont perform data validation if path validation fails -- protrects first run failures especially * fix excludedScopes * null config handlers plus proxy scope,key error handler * cleanup * debugging * fixed futures executions * additional fixes * dbu cost fix * getOrgID bug fix * target exists enhancement for delta target path validation * getWorkspaceByDatabase -- cross-cloud remote workspace enabled * added experimental flag to jrsnapshot and enabled manual module disabling * rollback and module mapping --------- Co-authored-by: geeksheikh <[email protected]> --------- Co-authored-by: geeksheikh <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Sriram Mohanty <[email protected]> Co-authored-by: Aman <[email protected]>
* initial commit * 788 bug aws single tenant multiworkspace deployment noneget (#798) * Change Validate for MWS logic * removed debug print --------- Co-authored-by: sourav.banerjee <[email protected]> * disable autoOptimizeShuffle (#795) * disable autoOptimizeShuffle * re-enabled optimizeShuffleParts * fixed exists tests (#799) * Enable API Error Reporting to PipReport and Safe Failure (#796) * possible solution * complete implementation * improved runtime calcs * improved fail message stdout --------- Co-authored-by: sourav.banerjee <[email protected]>
* initial commit * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Change Scala Sources Name * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Added Initializerv2.scala * Added Initializerv2.scala * Changed as per Sriram comment * Changed as per Sriram comment * dropped Initializer Deprecated --------- Co-authored-by: geeksheikh <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]>
* initial commit * Refractor Initializer (#683) * initial commit * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Change Scala Sources Name * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Added Initializerv2.scala * Added Initializerv2.scala * Changed as per Sriram comment * Changed as per Sriram comment * dropped Initializer Deprecated --------- Co-authored-by: geeksheikh <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]> * Change Job Trigger type to Triggered * Change Job Trigger type to Triggered * Change Job Trigger type to Triggered --------- Co-authored-by: geeksheikh <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]>
* initial commit * Refractor Initializer (#683) * initial commit * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Change Scala Sources Name * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Added Initializerv2.scala * Added Initializerv2.scala * Changed as per Sriram comment * Changed as per Sriram comment * dropped Initializer Deprecated --------- Co-authored-by: geeksheikh <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]> * gcp integration added * gcp integration added * minor updates from daniel * review comment implemented --------- Co-authored-by: geeksheikh <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]>
* conde changes completed * column name changed from etl_storage_prefix to storage prefix --------- Co-authored-by: geeksheikh <[email protected]>
…er. (#679) Co-authored-by: Carson Wilkins <[email protected]>
* code changes completed * code changes completed * code changes completed * code changes completed
* initial 0713 commit * handled null AccumUpates
Co-authored-by: Sourav Banerjee <[email protected]>
* api namespace change * review comments implemented
* getParams implemented * review comments implemented
* code implemeted * documentation added * implemented * review comments implemented --------- Co-authored-by: Daniel Tomes <[email protected]>
* Added Logic to create PipReport * suggested approach for overriding signatures * added a few enhancements * minor updates * removed table import and updated table referene to spark.table --------- Co-authored-by: Sourav Banerjee <[email protected]> Co-authored-by: Daniel Tomes <[email protected]>
* Update azure/aws_instance_details.csv * Update Azure_instance_details.csv Updated F4 to F4s for the instance Standard_F4s. Similarly for F8s and F16s.
SonarCloud Quality Gate failed. 0 Bugs No Coverage information |
closes #404 |
No description provided.