Spark读写MySQL数据库

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

Spark读写MySQL数据库

文章目录

一、读取数据库

一通过RDD的方式读取MySQL数据库

四要素驱动、连接地址、账号密码

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD读取MySQL数据库
 */
object spark_read_mysql {
  def main(args: Array[String]): Unit = {

    //创建SparkSession作用连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_read_mysql") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    val url = "jdbc:mysql://192.168.80.145:3306/test"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //具体的SQL查询语句
    val sql = "select * from t_user where id>=? and id<=?"

    //查询
    val rsRDD = new JdbcRDD(
      sc,
      ()=>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL数据库的连接
        DriverManager.getConnection(url,username,password)
      },
      //需要执行的SQL语句
      sql,
      //查询的开始行
      1,
      //查询的结束行
      20,
      //运行几个分区执行
      2,
      //返回值的处理将返回值变为RDD的元素数字从1开始表示字段的编号
      rs => (rs.getInt(1),rs.getString(2),rs.getInt(3))
    )

    //将RDD的元素打印在终端
    rsRDD.collect().foreach(println)

    sc.stop()
  }
}
二通过DataFrame的方式读取MySQL数据库
import org.apache.spark.sql.SparkSession

/**
 * 使用DataFrame读取MySQL数据库
 */
object spark_read_mysql2 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession作用连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]")//指定运行的方式
      .appName("spark_read_mysql2")//程序的名字
      .getOrCreate()

    //创建DataFrame
    val jdbcDF = spark.read.format("jdbc")
      .option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接
      .option("driver","com.mysql.cj.jdbc.Driver")//指定驱动
      .option("user","root")//指定连接的用户
      .option("password","123456")//指定连接的用户的密码
      .option("dbtable","t_user")//查询的表
      .load()//加载数据库表

    //在终端显示DataFrame的内容
    jdbcDF.show()
  }
}

二、添加数据到MySQL

一通过RDD的方式插入数据到MySQL

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD插入数据到MySQLRDD的每个元素都会执行一次创建连接和关闭连接
 */
object spark_write_mysql {
  def main(args: Array[String]): Unit = {

    //创建SparkSession作用连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数字符集为utf8防止插入的数据中文乱码
    val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //创建RDD
    val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))

    //打印RDD的元素
    //rdd.collect().foreach(println)

    //通过循环的方式读取RDD的每条元素将元素插入MySQL一个元素执行一次创建连接和插入和关闭连接
    rdd.foreach {
      case (name,age) =>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL的链接
        val conn = DriverManager.getConnection(url,username,password)
        //添加的SQL语句
        val sql = "insert into t_user(name,age) values(?,?)"
        //给SQL语句配置参数
        val ps = conn.prepareStatement(sql)
        //根据参数的类型配置参数
        ps.setString(1,name)
        ps.setInt(2,age)
        //执行SQL语句
        ps.executeUpdate()
        //关闭连接
        ps.close()
        conn.close()
      }
    }

    sc.stop()
  }
}
二通过RDD的方式插入数据到MySQL 2

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD插入数据到MySQLRDD的每个分区执行一次创建连接和关闭连接推荐
 */
object spark_write_mysql2 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession作用连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql2") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数字符集为utf8防止插入的数据中文乱码
    val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //创建RDD
    val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))

    //打印RDD的元素
    //rdd.collect().foreach(println)

    //通过循环的方式读取RDD的每个分区将元素插入MySQL一个分区执行一次创建连接和关闭连接
    rdd.foreachPartition {
      datas =>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL的链接
        val conn = DriverManager.getConnection(url,username,password)
        //添加的SQL语句
        val sql = "insert into t_user(name,age) values(?,?)"
        //给SQL语句配置参数
        val ps = conn.prepareStatement(sql)
        //根据参数的类型配置参数
        datas.foreach{
          case (name,age)=>{
            ps.setString(1,name)
            ps.setInt(2,age)

            //执行SQL语句
            ps.executeUpdate()
          }
        }
        //关闭连接
        ps.close()
        conn.close()
      }
    }

    sc.stop()
  }
}
三使用DataFrame插入数据到MySQL
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * 使用DataFrame插入数据到MySQL
 */

object spark_write_mysql3 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession作用连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql3") //程序的名字
      .getOrCreate()

    //1.创建DataFrame
    //1.1 schema
    val schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true)))
    //1.2 行rows
    //1.2.1 创建RDD
    val dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20)))
    //1.2.2 创建rows
    val rows = dataRDD.map(x=>Row(x(0),x(1)))
    //1.3 拼接表头schema和行内容rows
    val df = spark.createDataFrame(rows,schema)

    //2.通过DataFrame插入数据到MySQL
    //如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表需要注意表不能存在
    //df.write.mode("append")是以追加的方式将数据写入到已经存在的表中
    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8") //指定连接
      .option("driver", "com.mysql.cj.jdbc.Driver") //指定驱动
      .option("user", "root") //指定连接的用户
      .option("password", "123456") //指定连接的用户的密码
      .option("dbtable", "t_user2") //查询的表
      .save()//保存数据
  }
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

“Spark读写MySQL数据库” 的相关文章

Linux系统中jdk环境怎么配置 - 开发技术

这篇文章主要介绍“Linux系统中jdk环境怎么配置”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Linux系统中jdk环境怎么配置”文章能帮助大家解决问题。如下操作步骤为linux系统中部署jdk环境1.下载jdk安装包&n...

thinkphp5.0如何配置错误页 - 编程语言

这篇文章主要讲解了“thinkphp5.0如何配置错误页”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“thinkphp5.0如何配置错误页”吧! I. ThinkPHP 5.0 错误页的作用...

thinkphp如何关闭所有缓存 - 编程语言

这篇文章主要介绍“thinkphp如何关闭所有缓存”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“thinkphp如何关闭所有缓存”文章能帮助大家解决问题。 首先,我们需要了解 ThinkPHP 中的...

协作图

协作图是一种交互图,是动态视图的另一种表现形式,强调的是发送和接收消息的对象之间的组织结构与交互,而时序图则是系统与环境的交互。 协作:描述了在一定的语境中一组对象,以及用以实现某些行为的这些对象间的相互作用。 交互:是协作的一个消息集合,这些消息被类元角色...

Java中StringRedisTemplate和RedisTemplate怎么使用 - 开发技术

这篇文章主要介绍“Java中StringRedisTemplate和RedisTemplate怎么使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Java中StringRedisTemplate和RedisTemplate怎...

extern变量不能为static

static变量,即使在其它文件被声明为extern,链接器也不会找到他...