# R on Spark SparkR is an R package that provides a light-weight frontend to use Spark from R. ### Installing sparkR Libraries of sparkR need to be created in `$SPARK_HOME/R/lib`. This can be done by running the script `$SPARK_HOME/R/install-dev.sh`. By default the above script uses the system wide installation of R. However, this can be changed to any user installed location of R by setting the environment variable `R_HOME` the full path of the base directory where R is installed, before running install-dev.sh script. Example: ``` bash # where /home/username/R is where R is installed and /home/username/R/bin contains the files R and RScript export R_HOME=/home/username/R ./install-dev.sh ``` ### SparkR development #### Build Spark Build Spark with [Maven](https://spark.apache.org/docs/latest/building-spark.html#buildmvn) or [SBT](https://spark.apache.org/docs/latest/building-spark.html#building-with-sbt), and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run ``` bash # Maven ./build/mvn -DskipTests -Psparkr package # SBT ./build/sbt -Psparkr package ``` #### Running sparkR You can start using SparkR by launching the SparkR shell with ``` R ./bin/sparkR ``` The `sparkR` script automatically creates a SparkContext with Spark by default in local mode. To specify the Spark master of a cluster for the automatically created SparkContext, you can run ``` R ./bin/sparkR --master "local[2]" ``` To set other options like driver memory, executor memory etc. you can pass in the [spark-submit](https://spark.apache.org/docs/latest/submitting-applications.html) arguments to `./bin/sparkR` #### Using SparkR from RStudio If you wish to use SparkR from RStudio, please refer [SparkR documentation](https://spark.apache.org/docs/latest/sparkr.html#starting-up-from-rstudio). #### Making changes to SparkR The [instructions](https://spark.apache.org/contributing.html) for making contributions to Spark also apply to SparkR. If you only make R file changes (i.e. no Scala changes) then you can just re-install the R package using `R/install-dev.sh` and test your changes. Once you have made your changes, please include unit tests for them and run existing unit tests using the `R/run-tests.sh` script as described below. #### Generating documentation The SparkR documentation (Rd files and HTML files) are not a part of the source repository. To generate them you can run the script `R/create-docs.sh`. This script uses `devtools` and `knitr` to generate the docs and these packages need to be installed on the machine before using the script. Also, you may need to install these [prerequisites](https://github.com/apache/spark/tree/master/docs#prerequisites). See also, `R/DOCUMENTATION.md` ### Examples, Unit tests SparkR comes with several sample programs in the `examples/src/main/r` directory. To run one of them, use `./bin/spark-submit `. For example: ``` bash ./bin/spark-submit examples/src/main/r/dataframe.R ``` You can run R unit tests by following the instructions under [Running R Tests](https://spark.apache.org/docs/latest/building-spark.html#running-r-tests). ### Running on YARN The `./bin/spark-submit` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run ``` bash export YARN_CONF_DIR=/etc/hadoop/conf ./bin/spark-submit --master yarn examples/src/main/r/dataframe.R ``` # Package index ## Distributed Data Frame - `SparkDataFrame-class` : S4 class that represents a SparkDataFrame - `groupedData()` : S4 class that represents a GroupedData - `agg()` `summarize()` : summarize - `arrange()` `orderBy(,)` : Arrange Rows by Variables - `approxQuantile(,,,)` : Calculates the approximate quantiles of numerical columns of a SparkDataFrame - `as.data.frame()` : Download data from a SparkDataFrame into a R data.frame - `attach()` : Attach SparkDataFrame to R search path - `broadcast()` : broadcast - `cache()` : Cache - `cacheTable()` : Cache Table - `checkpoint()` : checkpoint - `collect()` : Collects all the elements of a SparkDataFrame and coerces them into an R data.frame. - `coltypes()` `` `coltypes<-`() `` : coltypes - `colnames()` `` `colnames<-`() `` `columns()` `names()` `` `names<-`() `` : Column Names of SparkDataFrame - `count()` `n()` : Count - `createDataFrame()` `as.DataFrame()` : Create a SparkDataFrame - `createExternalTable()` : (Deprecated) Create an external table - `createOrReplaceTempView()` : Creates a temporary view using the given name. - `createTable()` : Creates a table based on the dataset in a data source - `crossJoin(,)` : CrossJoin - `crosstab(,,)` : Computes a pair-wise frequency table of the given columns - `cube()` : cube - `describe()` : describe - `distinct()` `unique()` : Distinct - `dim()` : Returns the dimensions of SparkDataFrame - `drop()` : drop - `dropDuplicates()` : dropDuplicates - `dropna()` `na.omit()` `fillna()` : A set of SparkDataFrame functions working with NA values - `dtypes()` : DataTypes - `except()` : except - `exceptAll()` : exceptAll - `explain()` : Explain - `filter()` `where()` : Filter - `getNumPartitions()` : getNumPartitions - `group_by()` `groupBy()` : GroupBy - `head()` : Head - `hint()` : hint - `histogram(,)` : Compute histogram statistics for given column - `insertInto()` : insertInto - `intersect()` : Intersect - `intersectAll()` : intersectAll - `isLocal()` : isLocal - `isStreaming()` : isStreaming - `join(,)` : Join - `limit()` : Limit - `localCheckpoint()` : localCheckpoint - `merge()` : Merges two data frames - `mutate()` `transform()` : Mutate - `ncol()` : Returns the number of columns in a SparkDataFrame - `count()` `nrow()` : Returns the number of rows in a SparkDataFrame - `orderBy()` : Ordering Columns in a WindowSpec - `persist()` : Persist - `pivot(,)` : Pivot a column of the GroupedData and perform the specified aggregation. - `printSchema()` : Print Schema of a SparkDataFrame - `randomSplit()` : randomSplit - `rbind()` : Union two or more SparkDataFrames - `rename()` `withColumnRenamed()` : rename - `registerTempTable()` : (Deprecated) Register Temporary Table - `repartition()` : Repartition - `repartitionByRange()` : Repartition by range - `rollup()` : rollup - `sample()` `sample_frac()` : Sample - `sampleBy()` : Returns a stratified sample without replacement - `saveAsTable()` : Save the contents of the SparkDataFrame to a data source as a table - `schema()` : Get schema object - `select()` `` `$`() `` `` `$<-`() `` : Select - `selectExpr()` : SelectExpr - `show()` `show()` `show()` `show()` `show()` : show - `showDF()` : showDF - `str()` : Compactly display the structure of a dataset - `storageLevel()` : StorageLevel - `subset()` `` `[[`(,) `` `` `[[<-`(,) `` `` `[`() `` : Subset - `summary()` : summary - `take()` : Take the first NUM rows of a SparkDataFrame and return the results as a R data.frame - `tableToDF()` : Create a SparkDataFrame from a SparkSQL table or view - `toJSON()` : toJSON - `union()` : Return a new SparkDataFrame containing the union of rows - `unionAll()` : Return a new SparkDataFrame containing the union of rows. - `unionByName()` : Return a new SparkDataFrame containing the union of rows, matched by column names - `unpersist()` : Unpersist - `unpivot()` `melt(,,,,)` : Unpivot a DataFrame from wide format to long format. - `with()` : Evaluate a R expression in an environment constructed from a SparkDataFrame - `withColumn()` : WithColumn ## Data import and export - `read.df()` `loadDF()` : Load a SparkDataFrame - `read.jdbc()` : Create a SparkDataFrame representing the database table accessible via JDBC URL - `read.json()` : Create a SparkDataFrame from a JSON file. - `read.orc()` : Create a SparkDataFrame from an ORC file. - `read.parquet()` : Create a SparkDataFrame from a Parquet file. - `read.text()` : Create a SparkDataFrame from a text file. - `write.df()` `saveDF()` : Save the contents of SparkDataFrame to a data source. - `write.jdbc()` : Save the content of SparkDataFrame to an external database table via JDBC. - `write.json()` : Save the contents of SparkDataFrame as a JSON file - `write.orc()` : Save the contents of SparkDataFrame as an ORC file, preserving the schema. - `write.parquet()` : Save the contents of SparkDataFrame as a Parquet file, preserving the schema. - `write.text()` : Save the content of SparkDataFrame in a text file at the specified path. ## Column functions - `approx_count_distinct()` `approxCountDistinct()` `collect_list()` `collect_set()` `count_distinct()` `countDistinct()` `grouping_bit()` `grouping_id()` `kurtosis()` `max_by()` `min_by()` `n_distinct()` `percentile_approx()` `product()` `sd()` `skewness()` `stddev()` `std()` `stddev_pop()` `stddev_samp()` `sum_distinct()` `sumDistinct()` `var()` `variance()` `var_pop()` `var_samp()` `max()` `mean()` `min()` `sum()` : Aggregate functions for Column operations - `from_avro()` `to_avro()` : Avro processing functions for Column operations - `array_aggregate()` `array_contains()` `array_distinct()` `array_except()` `array_exists()` `array_forall()` `array_filter()` `array_intersect()` `array_join()` `array_max()` `array_min()` `array_position()` `array_remove()` `array_repeat()` `array_sort()` `array_transform()` `arrays_overlap()` `array_union()` `arrays_zip()` `arrays_zip_with()` `concat()` `element_at()` `explode()` `explode_outer()` `flatten()` `from_json()` `from_csv()` `map_concat()` `map_entries()` `map_filter()` `map_from_arrays()` `map_from_entries()` `map_keys()` `map_values()` `map_zip_with()` `posexplode()` `posexplode_outer()` `reverse()` `schema_of_csv()` `schema_of_json()` `shuffle()` `size()` `slice()` `sort_array()` `transform_keys()` `transform_values()` `to_json()` `to_csv()` : Collection functions for Column operations - `add_months()` `datediff()` `date_add()` `date_format()` `date_sub()` `from_utc_timestamp()` `months_between()` `next_day()` `to_utc_timestamp()` : Date time arithmetic functions for Column operations - `bin()` `bround()` `cbrt()` `ceil()` `conv()` `cot()` `csc()` `hex()` `hypot()` `ln()` `pmod()` `rint()` `sec()` `shiftLeft()` `shiftleft()` `shiftRight()` `shiftright()` `shiftRightUnsigned()` `shiftrightunsigned()` `signum()` `degrees()` `toDegrees()` `radians()` `toRadians()` `unhex()` `width_bucket()` `abs()` `acos()` `acosh()` `asin()` `asinh()` `atan()` `atanh()` `ceiling()` `cos()` `cosh()` `exp()` `expm1()` `factorial()` `floor()` `log()` `log10()` `log1p()` `log2()` `round()` `sign()` `sin()` `sinh()` `sqrt()` `tan()` `tanh()` `atan2()` : Math functions for Column operations - `assert_true()` `crc32()` `hash()` `md5()` `raise_error()` `sha1()` `sha2()` `xxhash64()` : Miscellaneous functions for Column operations - `array_to_vector()` `vector_to_array()` : ML functions for Column operations - `when()` `bitwise_not()` `bitwiseNOT()` `create_array()` `create_map()` `expr()` `greatest()` `input_file_name()` `isnan()` `least()` `lit()` `monotonically_increasing_id()` `nanvl()` `negate()` `negative()` `positive()` `rand()` `randn()` `spark_partition_id()` `struct()` `coalesce()` `is.nan()` `ifelse()` : Non-aggregate functions for Column operations - `ascii()` `base64()` `bit_length()` `concat_ws()` `decode()` `encode()` `format_number()` `format_string()` `initcap()` `instr()` `levenshtein()` `locate()` `lower()` `lpad()` `ltrim()` `octet_length()` `overlay()` `regexp_extract()` `regexp_replace()` `repeat_string()` `rpad()` `rtrim()` `split_string()` `soundex()` `substring_index()` `translate()` `trim()` `unbase64()` `upper()` `length()` : String functions for Column operations - `cume_dist()` `dense_rank()` `lag()` `lead()` `nth_value()` `ntile()` `percent_rank()` `rank()` `row_number()` : Window functions for Column operations - `alias()` `alias()` : alias - `asc()` `asc_nulls_first()` `asc_nulls_last()` `contains()` `desc()` `desc_nulls_first()` `desc_nulls_last()` `getField()` `getItem()` `isNaN()` `isNull()` `isNotNull()` `like()` `rlike()` `ilike()` : A set of operations working with SparkDataFrame columns - `avg()` : avg - `between()` : between - `cast()` : Casts the column to a different data type. - `column()` : S4 class that represents a SparkDataFrame column - `coalesce()` : Coalesce - `corr()` : corr - `cov()` `covar_samp()` `covar_pop()` : cov - `dropFields()` : dropFields - `endsWith()` : endsWith - `first()` : Return the first row of a SparkDataFrame - `last()` : last - `not()` `` `!`() `` : \! - `otherwise()` : otherwise - `startsWith()` : startsWith - `substr()` : substr - `current_date()` `current_timestamp()` `date_trunc()` `dayofmonth()` `dayofweek()` `dayofyear()` `from_unixtime()` `hour()` `last_day()` `make_date()` `minute()` `month()` `quarter()` `second()` `timestamp_seconds()` `to_date()` `to_timestamp()` `unix_timestamp()` `weekofyear()` `window()` `year()` `trunc()` : Date time functions for Column operations - `withField()` : withField - `over()` : over - `predict()` : Makes predictions from a MLlib model - `partitionBy()` : partitionBy - `rangeBetween()` : rangeBetween - `rowsBetween()` : rowsBetween - `windowOrderBy()` : windowOrderBy - `windowPartitionBy()` : windowPartitionBy - `WindowSpec-class` : S4 class that represents a WindowSpec - `` `%in%`() `` : Match a column with given values. - `` `%<=>%` `` : %\<=\>% ## Schema Definitions - `structField()` : structField - `structType()` : structType ## Structured Streaming - `StreamingQuery-class` : S4 class that represents a StreamingQuery - `awaitTermination()` : awaitTermination - `isActive()` : isActive - `queryName()` : queryName - `lastProgress()` : lastProgress - `read.stream()` : Load a streaming SparkDataFrame - `status()` : status - `stopQuery()` : stopQuery - `withWatermark()` : withWatermark - `write.stream()` : Write the streaming SparkDataFrame to a data source. ## Spark MLlib MLlib is Spark’s machine learning (ML) library - `AFTSurvivalRegressionModel-class` : S4 class that represents a AFTSurvivalRegressionModel - `ALSModel-class` : S4 class that represents an ALSModel - `BisectingKMeansModel-class` : S4 class that represents a BisectingKMeansModel - `DecisionTreeClassificationModel-class` : S4 class that represents a DecisionTreeClassificationModel - `DecisionTreeRegressionModel-class` : S4 class that represents a DecisionTreeRegressionModel - `FMClassificationModel-class` : S4 class that represents a FMClassificationModel - `FMRegressionModel-class` : S4 class that represents a FMRegressionModel - `FPGrowthModel-class` : S4 class that represents a FPGrowthModel - `GBTClassificationModel-class` : S4 class that represents a GBTClassificationModel - `GBTRegressionModel-class` : S4 class that represents a GBTRegressionModel - `GaussianMixtureModel-class` : S4 class that represents a GaussianMixtureModel - `GeneralizedLinearRegressionModel-class` : S4 class that represents a generalized linear model - `glm(,,)` : Generalized Linear Models (R-compliant) - `IsotonicRegressionModel-class` : S4 class that represents an IsotonicRegressionModel - `KMeansModel-class` : S4 class that represents a KMeansModel - `KSTest-class` : S4 class that represents an KSTest - `LDAModel-class` : S4 class that represents an LDAModel - `LinearRegressionModel-class` : S4 class that represents a LinearRegressionModel - `LinearSVCModel-class` : S4 class that represents an LinearSVCModel - `LogisticRegressionModel-class` : S4 class that represents an LogisticRegressionModel - `MultilayerPerceptronClassificationModel-class` : S4 class that represents a MultilayerPerceptronClassificationModel - `NaiveBayesModel-class` : S4 class that represents a NaiveBayesModel - `PowerIterationClustering-class` : S4 class that represents a PowerIterationClustering - `PrefixSpan-class` : S4 class that represents a PrefixSpan - `RandomForestClassificationModel-class` : S4 class that represents a RandomForestClassificationModel - `RandomForestRegressionModel-class` : S4 class that represents a RandomForestRegressionModel - `fitted()` : Get fitted result from a k-means model - `freqItems(,)` : Finding frequent items for columns, possibly with false positives - `spark.als()` `summary()` `predict()` `write.ml(,)` : Alternating Least Squares (ALS) for Collaborative Filtering - `spark.bisectingKmeans()` `summary()` `predict()` `fitted()` `write.ml(,)` : Bisecting K-Means Clustering Model - `spark.decisionTree()` `summary()` `print()` `summary()` `print()` `predict()` `predict()` `write.ml(,)` `write.ml(,)` : Decision Tree Model for Regression and Classification - `spark.fmClassifier()` `summary()` `predict()` `write.ml(,)` : Factorization Machines Classification Model - `spark.fmRegressor()` `summary()` `predict()` `write.ml(,)` : Factorization Machines Regression Model - `spark.fpGrowth()` `spark.freqItemsets()` `spark.associationRules()` `predict()` `write.ml(,)` : FP-growth - `spark.gaussianMixture()` `summary()` `predict()` `write.ml(,)` : Multivariate Gaussian Mixture Model (GMM) - `spark.gbt()` `summary()` `print()` `summary()` `print()` `predict()` `predict()` `write.ml(,)` `write.ml(,)` : Gradient Boosted Tree Model for Regression and Classification - `spark.glm()` `summary()` `print()` `predict()` `write.ml(,)` : Generalized Linear Models - `spark.isoreg()` `summary()` `predict()` `write.ml(,)` : Isotonic Regression Model - `spark.kmeans()` `summary()` `predict()` `write.ml(,)` : K-Means Clustering Model - `spark.kstest()` `summary()` `print()` : (One-Sample) Kolmogorov-Smirnov Test - `spark.lda()` `spark.posterior()` `spark.perplexity()` `summary()` `write.ml(,)` : Latent Dirichlet Allocation - `spark.lm()` `summary()` `predict()` `write.ml(,)` : Linear Regression Model - `spark.logit()` `summary()` `predict()` `write.ml(,)` : Logistic Regression Model - `spark.mlp()` `summary()` `predict()` `write.ml(,)` : Multilayer Perceptron Classification Model - `spark.naiveBayes()` `summary()` `predict()` `write.ml(,)` : Naive Bayes Models - `spark.assignClusters()` : PowerIterationClustering - `spark.findFrequentSequentialPatterns()` : PrefixSpan - `spark.randomForest()` `summary()` `print()` `summary()` `print()` `predict()` `predict()` `write.ml(,)` `write.ml(,)` : Random Forest Model for Regression and Classification - `spark.survreg()` `summary()` `predict()` `write.ml(,)` : Accelerated Failure Time (AFT) Survival Regression Model - `spark.svmLinear()` `predict()` `summary()` `write.ml(,)` : Linear SVM Model - `read.ml()` : Load a fitted MLlib model from the input path. - `write.ml()` : Saves the MLlib model to the input path ## Distributed R - `dapply()` : dapply - `dapplyCollect()` : dapplyCollect - `gapply()` : gapply - `gapplyCollect()` : gapplyCollect - `spark.lapply()` : Run a function over a list of elements, distributing the computations with Spark ## SQL Catalog - `currentCatalog()` : Returns the current default catalog - `currentDatabase()` : Returns the current default database - `databaseExists()` : Checks if the database with the specified name exists. - `dropTempTable()` : (Deprecated) Drop Temporary Table - `dropTempView()` : Drops the temporary view with the given view name in the catalog. - `functionExists()` : Checks if the function with the specified name exists. - `getDatabase()` : Get the database with the specified name - `getFunc()` : Get the function with the specified name - `getTable()` : Get the table with the specified name - `listCatalogs()` : Returns a list of catalog available - `listColumns()` : Returns a list of columns for the given table/view in the specified database - `listDatabases()` : Returns a list of databases available - `listFunctions()` : Returns a list of functions registered in the specified database - `listTables()` : Returns a list of tables or views in the specified database - `refreshByPath()` : Invalidates and refreshes all the cached data and metadata for SparkDataFrame containing path - `refreshTable()` : Invalidates and refreshes all the cached data and metadata of the given table - `recoverPartitions()` : Recovers all the partitions in the directory of a table and update the catalog - `setCurrentCatalog()` : Sets the current default catalog - `setCurrentDatabase()` : Sets the current default database - `tableExists()` : Checks if the table with the specified name exists. - `tableNames()` : Table Names - `tables()` : Tables - `uncacheTable()` : Uncache Table ## Spark Session and Context - `cancelJobGroup()` : Cancel active jobs for the specified group - `cancelJobsWithTag()` : Cancel active jobs that have the specified tag. - `clearCache()` : Clear Cache - `clearJobGroup()` : Clear current job group ID and its description - `getLocalProperty()` : Get a local property set in this thread, or `NULL` if it is missing. See `setLocalProperty`. - `install.spark()` : Download and Install Apache Spark to a Local Directory - `setCheckpointDir()` : Set checkpoint directory - `setJobDescription()` : Set a human readable description of the current job. - `setInterruptOnCancel()` : Set the behavior of job cancellation from jobs started in this thread. - `setJobGroup()` : Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared. - `addJobTag()` : Add a tag to be assigned to all the jobs started by this thread. - `removeJobTag()` : Remove a tag previously added to be assigned to all the jobs started by this thread. Noop if such a tag was not added earlier. - `getJobTags()` : Get the tags that are currently set to be assigned to all the jobs started by this thread. - `clearJobTags()` : Clear the current thread's job tags. - `setLocalProperty()` : Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool. - `setLogLevel()` : Set new log level - `spark.addFile()` : Add a file or directory to be downloaded with this Spark job on every node. - `spark.getSparkFiles()` : Get the absolute path of a file added through spark.addFile. - `spark.getSparkFilesRootDirectory()` : Get the root directory that contains files added through spark.addFile. - `sparkR.conf()` : Get Runtime Config from the current active SparkSession - `sparkR.callJMethod()` : Call Java Methods - `sparkR.callJStatic()` : Call Static Java Methods - `sparkR.init()` : (Deprecated) Initialize a new Spark Context - `sparkR.newJObject()` : Create Java Objects - `sparkR.session()` : Get the existing SparkSession or initialize a new SparkSession. - `sparkR.session.stop()` `sparkR.stop()` : Stop the Spark Session and Spark Context - `sparkR.uiWebUrl()` : Get the URL of the SparkUI instance for the current active SparkSession - `sparkR.version()` : Get version of Spark on which this application is running - `sparkRHive.init()` : (Deprecated) Initialize a new HiveContext - `sparkRSQL.init()` : (Deprecated) Initialize a new SQLContext - `sql()` : SQL Query # Articles ### All vignettes - [SparkR - Practical Guide](https://spark.apache.org/docs/3.5.8/api/R/articles/sparkr-vignettes.md):