Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify OMSSparkConf implementation #19

Open
himanishk opened this issue Oct 4, 2021 · 0 comments
Open

Simplify OMSSparkConf implementation #19

himanishk opened this issue Oct 4, 2021 · 0 comments
Labels
enhancement New feature or request

Comments

@himanishk
Copy link
Contributor

You could simplify this a bit, with an internal mapping case class between spark conf and oms conf and also establish required fields that way. something like this. You can also just avoid this internal mapping to maintain by refactoring your original case class OMSConfig to use this.

/*
 * Copyright (2021) Databricks, Inc.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE
 * AND NONINFRINGEMENT.
 *
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
 * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 * See the Full License for the specific language governing permissions and
 * limitations under the License.
 */

package com.databricks.labs.deltaoms.common

import com.databricks.labs.deltaoms.common.OMSSparkConfUtils.{buildConfKey, getSparkConf}
import com.databricks.labs.deltaoms.configuration.{OMSConfig, SparkSettings}

private object OMSSparkConfUtils extends SparkSettings {
  def buildConfKey(key: String): String = s"databricks.labs.deltaoms.${key}"

  def getSparkConf[T](confKey: String): Option[String] = {
    spark.conf.getOption(confKey)
  }
}

case class WithSparkConf[T](value: T,
                            sparkConfigName: String,
                            is_required: Boolean = false)


case class OMSSparkConfig(baseLocation: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("base.location"),
                              is_required = true),
                          dbName: WithSparkConf[Option[String]] =
                          WithSparkConf(None,
                            buildConfKey("db.name"),
                            is_required = true),
                          checkpointBase: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("checkpoint.base"),
                              is_required = true),
                          checkpointSuffix: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("checkpoint.suffix"),
                              is_required = true),
                          rawActionTable: WithSparkConf[String] =
                            WithSparkConf("rawactions",
                              buildConfKey("raw.action.table")),
                          sourceConfigTable: WithSparkConf[String] =
                            WithSparkConf("sourceconfig",
                              buildConfKey("source.config.table")),
                          pathConfigTable: WithSparkConf[String] =
                            WithSparkConf("pathconfig",
                              buildConfKey("path.config.table")),
                          processedHistoryTable: WithSparkConf[String] =
                            WithSparkConf("processedhistory",
                              buildConfKey("processed.history.table")),
                          commitInfoSnapshotTable: WithSparkConf[String] =
                            WithSparkConf("commitinfosnapshots",
                              buildConfKey("commitinfo.snapshot.table")),
                          actionSnapshotTable: WithSparkConf[String] =
                            WithSparkConf("actionsnapshots",
                              buildConfKey("action.snapshot.table")),
                          consolidateWildcardPaths: WithSparkConf[Boolean] =
                            WithSparkConf(true,
                              buildConfKey("consolidate.wildcard.paths")),
                          truncatePathConfig: WithSparkConf[Boolean] =
                            WithSparkConf(false,
                              buildConfKey("truncate.path.config")),
                          skipPathConfig: WithSparkConf[Boolean] =
                            WithSparkConf(false,
                              buildConfKey("skip.path.config")),
                          skipInitializeOMS: WithSparkConf[Boolean] =
                            WithSparkConf(false,
                              buildConfKey("skip.initialize")),
                          srcDatabases: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("src.databases")),
                          tablePattern: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("table.pattern")),
                          triggerInterval: WithSparkConf[Option[String]] =
                            WithSparkConf(None,
                              buildConfKey("trigger.interval")),
                          startingStream: WithSparkConf[Int] =
                            WithSparkConf(1,
                              buildConfKey("starting.stream")),
                          endingStream: WithSparkConf[Int] =
                            WithSparkConf(50,
                              buildConfKey("ending.stream")))

trait OMSSparkConf extends Serializable with SparkSettings {

  def consolidateOMSConfigFromSparkConf(config: OMSConfig): OMSConfig = {
    val sparkOmsConfMap = OMSSparkConfig()
    OMSConfig(
      baseLocation = getSparkConf(sparkOmsConfMap.baseLocation.sparkConfigName)
        .fold(config.baseLocation){Some(_)},
      dbName = getSparkConf(sparkOmsConfMap.dbName.sparkConfigName)
        .fold(config.dbName){Some(_)},
      checkpointBase = getSparkConf(sparkOmsConfMap.checkpointBase.sparkConfigName)
        .fold(config.checkpointBase){Some(_)},
      checkpointSuffix = getSparkConf(sparkOmsConfMap.checkpointSuffix.sparkConfigName)
        .fold(config.checkpointSuffix){Some(_)},
      rawActionTable = getSparkConf(sparkOmsConfMap.rawActionTable.sparkConfigName)
        .fold(config.rawActionTable){_},
      sourceConfigTable = getSparkConf(sparkOmsConfMap.sourceConfigTable.sparkConfigName)
        .fold(config.sourceConfigTable){_},
      pathConfigTable = getSparkConf(sparkOmsConfMap.pathConfigTable.sparkConfigName)
        .fold(config.pathConfigTable){_},
      processedHistoryTable = getSparkConf(sparkOmsConfMap.processedHistoryTable.sparkConfigName)
        .fold(config.processedHistoryTable){_},
      commitInfoSnapshotTable = getSparkConf(sparkOmsConfMap.commitInfoSnapshotTable.sparkConfigName)
        .fold(config.commitInfoSnapshotTable){_},
      actionSnapshotTable = getSparkConf(sparkOmsConfMap.actionSnapshotTable.sparkConfigName)
        .fold(config.actionSnapshotTable){_},
      consolidateWildcardPaths = getSparkConf(sparkOmsConfMap.consolidateWildcardPaths.sparkConfigName)
        .fold(config.consolidateWildcardPaths){_.toBoolean},
      truncatePathConfig = getSparkConf(sparkOmsConfMap.truncatePathConfig.sparkConfigName)
        .fold(config.truncatePathConfig){_.toBoolean},
      skipPathConfig = getSparkConf(sparkOmsConfMap.skipPathConfig.sparkConfigName)
        .fold(config.skipPathConfig){_.toBoolean},
      skipInitializeOMS = getSparkConf(sparkOmsConfMap.skipInitializeOMS.sparkConfigName)
        .fold(config.skipInitializeOMS){_.toBoolean},
      srcDatabases = getSparkConf(sparkOmsConfMap.srcDatabases.sparkConfigName)
        .fold(config.srcDatabases){Some(_)},
      tablePattern = getSparkConf(sparkOmsConfMap.tablePattern.sparkConfigName)
        .fold(config.tablePattern){Some(_)},
      triggerInterval = getSparkConf(sparkOmsConfMap.triggerInterval.sparkConfigName)
        .fold(config.triggerInterval){Some(_)},
      startingStream = getSparkConf(sparkOmsConfMap.startingStream.sparkConfigName)
        .fold(config.startingStream){_.toInt},
      endingStream = getSparkConf(sparkOmsConfMap.endingStream.sparkConfigName)
        .fold(config.endingStream){_.toInt}
  }
}

object OMSSparkConf extends OMSSparkConf

Originally posted by @bali0019 in #17 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant