import $ivy.`org.apache.spark::spark-sql:2.4.0` // Or use any other 2.x version here
import $ivy.`org.apache.spark::spark-mllib:2.4.0`
import $ivy.`sh.almond::ammonite-spark:0.4.0`
import $ivy.`org.datasyslab:geospark:1.2.0`
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.rdd.RDD
import org.datasyslab.geospark.enums.{GridType, IndexType}
import org.datasyslab.geospark.spatialOperator.JoinQuery
import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
import scala.collection.JavaConverters._
import java.io._
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
import org.apache.spark.sql._
val spark = AmmoniteSparkSession.builder()
.master("local[*]").appName("Validator")
.getOrCreate()
import spark.implicits._
val appID = spark.sparkContext.applicationId
Polygons from 18 states were collected for both source and target in WKT format. They are available at: https://github.com/aocalderon/RIDIR/tree/master/Datasets/AreaTablesValidation.
import sys.process._
val path = "/home/acald013/RIDIR/Datasets/AreaTablesValidation"
s"ls -lah ${path}" #| "grep wkt" !
For each set of source & target, we run the corresponding script:
Each script save the results to disk for further analysis (files are also available in the same repo).
val path = "/home/acald013/RIDIR/Datasets/AreaTablesValidation"
s"ls -lah ${path}" #| "grep tsv" !
val state = "NY"
val geopandas = spark.read.option("header", "false").option("delimiter", "\t").csv(s"${path}/${state}_geopandas_test.tsv").distinct()
geopandas.count()
val geospark = spark.read.option("header", "false").option("delimiter", "\t").csv(s"${path}/${state}_geospark_test.tsv").distinct()
geospark.count()
val p = geopandas.map(p => (p.getString(0).toInt, p.getString(1).toInt, p.getString(2).toDouble)).rdd
.sortBy(p => (p._1, p._2, p._3)).map(_._3)
val s = geospark.map(s => (s.getString(0).toInt, s.getString(1).toInt, s.getString(2).toDouble)).rdd
.sortBy(p => (p._1, p._2, p._3)).map(_._3)
val areas = p.zip(s)
areas.toDF("area1", "area2").show(truncate = false)
val reg = new RegressionMetrics(areas)
reg.r2
reg.meanAbsoluteError
reg.meanSquaredError
reg.rootMeanSquaredError