JOTTINGS-Ubuntu16.04 LTS complete clone

2023-01-23   ES  

spark-dataframe data Insert MySQL performance optimization

recently used Spark for data processing in the company project. The results of the data are written to MySQL or TIDB. The results obtained by SPARK after finishing a series of RDD operations are inserted into the data through JDBC, but the inserted data is very slow. Start studying this code and find performance optimization.

spark made us encapsulate, and the code inserted into MySQL is very simple. Just call the spark’s API

       .option("url",getValueOfPrefix(prefix,"url"))  // Database connection address
       .option("isolationLevel","NONE")  // Do not turn on transactions
       .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE,150)  // Set batch size
       .option("dbtable", tableName)  // Insert table
       .option("user",getValueOfPrefix(prefix,"username"))  // database user name
       .option("password",getValueOfPrefix(prefix,"password"))  // database password 

 The code above

, the speed of running is a bit slow. The record of thousands of thousands of calls is about 2 minutes. Later, I found some information on the Internet. The reason is very simple. This has not been opened in batches. Although the code is set, the data level does not open a batch query. After the database is connected

db.url= "jdbc:mysql://localhost:3306/User? rewriteBatchedStatements=true";

After setting this parameter, inserting thousands of records is basically spike.

First of all, DataFrame will call the WRITE method. This method returns an ORG.APACHE.SPARK.SQL.DATAFREWRITER object. All the attribute setting methods of this object use the chain operation technology method (after setting the attribute, return this)

  def write: DataFrameWriter[T] = {
    if (isStreaming) {
        "'write' can not be called on streaming Dataset/DataFrame")
    new DataFrameWriter[T](this)

After setting the insertion attribute, call the save () method to execute the results to save. In the SAVE method, org.apache.spark.sql.execution.datasources.datasource objects are created to complete the operation of saving data by calling the WRITE (Mode, DF) method of the DataSource object.

  def save(): Unit = {
    val dataSource = DataSource(
      className = source,
      partitionColumns = partitioningColumns.getOrElse(Nil),
      bucketSpec = getBucketSpec,
      options = extraOptions.toMap)

    dataSource.write(mode, df)

WRITE method did two things, the judgment result was saved to the database, or the file system was saved. This tracking is to save the result to the data.

  def write(mode: SaveMode, data: DataFrame): Unit = {
    if ([CalendarIntervalType])) {
      throw new AnalysisException("Cannot save interval data type into external storage.")

    providingClass.newInstance() match {
      case dataSource: CreatableRelationProvider =>
        dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)  // Save to database
      case format: FileFormat =>
        writeInFileFormat(format, mode, data)
      case _ =>
        sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")

org.apache.spark.sql.sources.creataBlererLoveProvider#Createrelation is an interface, his implementation in organg.apache.spark.sql.execution.jdbcRoVider

  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      df: DataFrame): BaseRelation = {
    val jdbcOptions = new JDBCOptions(parameters)
    val url = jdbcOptions.url
    val table = jdbcOptions.table
    val createTableOptions = jdbcOptions.createTableOptions
    val isTruncate = jdbcOptions.isTruncate

    val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
    try {
      val tableExists = JdbcUtils.tableExists(conn, url, table)
      if (tableExists) {
        mode match {
          case SaveMode.Overwrite =>
            if (isTruncate && isCascadingTruncateTable(url) == Some(false)) {
              // In this case, we should truncate table and then load.
              truncateTable(conn, table)
              saveTable(df, url, table, jdbcOptions)
            } else {
              // Otherwise, do not truncate the table, instead drop and recreate it
              dropTable(conn, table)
              createTable(df.schema, url, table, createTableOptions, conn)
              saveTable(df, url, table, jdbcOptions)

          case SaveMode.Append =>
            saveTable(df, url, table, jdbcOptions)

          case SaveMode.ErrorIfExists =>
            throw new AnalysisException(
              s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")

          case SaveMode.Ignore =>
            // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
            // to not save the contents of the DataFrame and to not change the existing data.
            // Therefore, it is okay to do nothing here and then just return the relation below.
      } else {
        createTable(df.schema, url, table, createTableOptions, conn)
        saveTable(df, url, table, jdbcOptions)
    } finally {

    createRelation(sqlContext, parameters)

Finally, through org.apache.spark.sql.execution.datasources.jdbc.jdbcutils# savetable function to complete the data insertion.

  def saveTable(
      df: DataFrame,
      url: String,
      table: String,
      options: JDBCOptions) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = {
     field =>
      getJdbcType(field.dataType, dialect).jdbcNullType

    val rddSchema = df.schema
    val getConnection: () => Connection = createConnectionFactory(options)
    val batchSize = options.batchSize
    val isolationLevel = options.isolationLevel
    df.foreachPartition(iterator => savePartition(
      getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)

As can be seen, DataFrame calls the ForeachPartitation function to perform partition insert operations. What is really completed is in the SavePartition, function.

 def savePartition(
      getConnection: () => Connection,
      table: String,
      iterator: Iterator[Row],
      rddSchema: StructType,
      nullTypes: Array[Int],
      batchSize: Int,
      dialect: JdbcDialect,
      isolationLevel: Int): Iterator[Byte] = {
    val conn = getConnection()
    var committed = false

    var finalIsolationLevel = Connection.TRANSACTION_NONE
    if (isolationLevel != Connection.TRANSACTION_NONE) {
      try {
        val metadata = conn.getMetaData
        if (metadata.supportsTransactions()) {
          // Update to at least use the default isolation, if any transaction level
          // has been chosen and transactions are supported
          val defaultIsolation = metadata.getDefaultTransactionIsolation
          finalIsolationLevel = defaultIsolation
          if (metadata.supportsTransactionIsolationLevel(isolationLevel))  {
            // Finally update to actually requested level if possible
            finalIsolationLevel = isolationLevel
          } else {
            logWarning(s"Requested isolation level $isolationLevel is not supported; " +
                s"falling back to default isolation level $defaultIsolation")
        } else {
          logWarning(s"Requested isolation level $isolationLevel, but transactions are unsupported")
      } catch {
        case NonFatal(e) => logWarning("Exception while detecting transaction support", e)
    val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE

    try {
      if (supportsTransactions) {
        conn.setAutoCommit(false) // Everything in the same db transaction.
      val stmt = insertStatement(conn, table, rddSchema, dialect)
      val setters: Array[JDBCValueSetter] =
        .map(makeSetter(conn, dialect, _)).toArray
      val numFields = rddSchema.fields.length

      try {
        var rowCount = 0
        while (iterator.hasNext) {
          val row =
          var i = 0
          while (i < numFields) {
            if (row.isNullAt(i)) {
              stmt.setNull(i + 1, nullTypes(i))
            } else {
              setters(i).apply(stmt, row, i)
            i = i + 1
          rowCount += 1
          if (rowCount % batchSize == 0) {
            rowCount = 0
        if (rowCount > 0) {
      } finally {
      if (supportsTransactions) {
      committed = true
    } catch {
      case e: SQLException =>
        val cause = e.getNextException
        if (cause != null && e.getCause != cause) {
          if (e.getCause == null) {
          } else {
        throw e
    } finally {
      if (!committed) {
        // The stage must fail.  We got here through an exception path, so
        // let the exception through unless rollback() or close() want to
        // tell the user about another problem.
        if (supportsTransactions) {
      } else {
        // The stage must succeed.  We cannot propagate any exception close() might throw.
        try {
        } catch {
          case e: Exception => logWarning("Transaction succeeded, but closing failed", e)

Many times, I thought it was almost the same. In fact, when you did better, I found that the original doing it was much worse. So don’t give up at any time. JDBC inserted thousands of records for 2 minutes. I received more than a month for this result. Now it is finally resolved. It is still very good. Here is a record of finding optimization paths.


Related Posts

vs Foundation Mission failed. MSB4018CA

Windows use VLC plug -in to play RTSP video streams on IE browser

virtualBox Install CentOS6 Enhanced Packaging Error Solutions and Configure Sharing Folder Field

Objective-C Nstimer timing operation and cadisplayLink

JOTTINGS-Ubuntu16.04 LTS complete clone

Random Posts

web development tool class (3): jsonutils

Git experience for the first time

Sword refers to offer binary search tree and two -way linked list pythongod

QT’s default file path path path obtaining method

zabbix 3.4: zabbix monitor nginx The purpose of