-
Notifications
You must be signed in to change notification settings - Fork 439
Simple Examples of EasyML
callmevan edited this page Sep 4, 2017
·
9 revisions
本文将通过两个简单的示例分别展示如何使用EasyML以单机或分布式的方式运行程序
首先点击 Upload Data
上传本次程序需要使用的输入文件
- 数据名称
- 所属目录 --- System Data为系统实例数据目录;Shared Data为用户共享目录,此目录下的数据将可以被所有用户使用。My Data为私人目录,此目录下数据仅用户本人可见。
- 数据类型 --- 共有JSON/CSV/TSV/General四种类型,除了JSON/CSV/TSV外的数据类型全部使用General类型。例如本次使用的WordCount_test.txt,类型选择是General
- 数据版本
- 上传按钮
- 数据描述 --- 可以为空
在填写完成后,点击Submit
按钮开始上传Data
上传成功后你会看到这个提示
之后就可以在目录下查看已上传的Data
- WordCount 词频统计,将一个文件中使用的词汇及其使用次数统计后输出
点击 Upload Program
上传程序 本次上传的是无Parameter的版本
面板的详细说明请参照EasyML User's Guide
- Standalone代表以单机的方式执行,例如执行一个java或python程序;
- Distributed代表以分布式的方式执行,如一个Spark的算法包;
- ETL代表进行数据的读入和写出的模块。
- 注意到这里没有填写
Parameter
一栏,在此贴出WordCount(无Parameter)的源码
package word
/**
* Created by Administrator on 2017/8/26.
* scalaVersion := "2.11.8"
*/
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
object WordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("error")
System.exit(1)
}
val conf = new SparkConf().setAppName(args(2))
val sc = new SparkContext(conf)
val line = sc.textFile(args(0))
//output to file
line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).saveAsTextFile(args(1))
//output to screen
line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
sc.stop()
}
}
/**
* 代码中arg(0) arg(1) arg(2)分别对应CMD中的 in out string
*/
-
Parameter
实际上是参数的一个前缀,是为了方便理解参数的含义而设置的。如果要使用Parameter
,那么需要对应的在程序中对其进行解析。例如:
- 这里给三个参数的前缀分别为
input_pt
output_pt
appname
。 相应的,需要在程序里进行解析。在此贴出WordCount(有Parameter)的源码
package word
/**
* Created by Administrator on 2017/8/26.
* scalaVersion := "2.11.8"
*/
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser
import org.apache.spark.SparkContext._
object WordCount {
/** command line parameters */
case class Params(input_pt: String = "",
output_pt: String = "",
appname: String = "")
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: <file>")
System.exit(1)
}
val default_params = Params()
val parser = new OptionParser[Params]("WordCount") {
head("WordCount: Count words in documents.")
opt[String]("input_pt")
.required()
.text("Input document file path")
.action((x, c) => c.copy(input_pt = x))
opt[String]("output_pt")
.required()
.text("Output document file path")
.action((x, c) => c.copy(output_pt = x))
opt[String]("appname")
.required()
.text("appname")
.action((x, c) => c.copy(appname = x))
}
parser.parse(args, default_params).map { params =>
run(params)
} getOrElse {
System.exit(1)
}
}
def run(p:Params): Unit = {
val conf = new SparkConf().setAppName(p.appname)
val sc = new SparkContext(conf)
val line = sc.textFile(p.input_pt)
//output to file
line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(p.output_pt)
//output to screen
line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
sc.stop()
}
}
/**
* 代码中p.input_pt p.output_pt p.appname分别对应CMD中的 in out string
*/
- CMD及参数都配置完后,点击
Generate
,你就会看到一条完整的CMD - 可以看出,Parameter字段内容以
--'Parameter'
生成在了真正的参数之前。对应的type为in
解析为:{in:Value
:"description
"}out
解析为:{out:Value
:"description
'"}string
解析为:["description
":string:default,"Default
"] - 在最后程序执行时,它会被替换为
spark-submit --class word.WordCount wordcount.jar --input_pt '文件名' --output_pt '文件名' --appname 'wordcount'
以上步骤都完成后,点击Submit
按钮开始上传Program
上传成功后你会看到这个提示
之后就可以在目录下查看已上传的Program
- FileSplit 文件内容分割,读取一个文件中的内容按照特定的比例分别存入两个文件
点击 Upload Program
上传程序
参数说明请参照1.2.1
- FileSplit(无Parameter)
package filesplit
import scala.util.Random
import scala.io.Source
import java.io._
/**
* Created by Administrator on 2017/9/1.
*/
object filesplit {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: <input_file> <rate> <output_file1> <output_file2>")
System.exit(1)
}
/**
* Split a file into two partitions.
*
* infile A file.
* rate the ratio in two partitions.
*/
val infile = readLines(args(0)).toSeq
val rate = args(1).toDouble
val inf_len = infile.size
val outf1_len = (inf_len * rate).toInt
val outf2_len = infile.size - outf1_len
val outf1 = new Array[String](outf1_len)
val outf2 = new Array[String](outf2_len)
var ind1 = 0
while (ind1 < outf1_len) {
outf1(ind1) = infile(ind1)
ind1 += 1
}
var ind2 = 0
while (ind2 < outf2_len) {
outf2(ind2) = infile(outf1_len + ind2)
val rand = Random.nextInt(outf1_len + ind2)
if (rand < outf1_len) {
swap(outf1, outf2, rand, ind2)
}
ind2 += 1
}
writeLines(args(2),outf1.toSeq)
writeLines(args(3),outf2.toSeq)
}
def readLines(pt: String): Iterator[String] = {
val rf = Source.fromFile(pt)
rf.getLines()
}
def writeLines(res_pt: String, lines: TraversableOnce[String]) {
val wf = new PrintWriter(res_pt)
lines.foreach { l => wf.write(l + '\n') }
wf.close()
}
/**
* Swap two elements in different arrays.
*
* @param f1 array stored part of the file.
* @param f2 array stored part of the file.
* @param id1 element id of array-f1.
* @param id2 element id of array-f2.
*/
private def swap(f1: Array[String], f2: Array[String], id1: Int, id2: Int): Unit = {
val str = f1(id1)
f1(id1) = f2(id2)
f2(id2) = str
}
}
/**
* 代码中arg(0) arg(1) arg(2) arg(3)分别对应CMD中的 in double out out
*/
- CMD及参数都配置完后,点击
Generate
,你就会看到一条完整的CMD
以上步骤都完成后,点击Submit
按钮开始上传Program
上传成功后你会看到这个提示
之后就可以在目录下查看已上传的Program
首先点击Create Job
新建一个空任务
- 将上传的Data
wordtest
和Programwordcount
点击加入任务wordtest
作为wordcount
的输入文件,将图标上代表输出的绿色点连接至wordcount
图标上代表输入的蓝色点
- 将上传的Data
wordtest
和Programfilesplit
点击加入任务wordtest
作为filesplit
的输入文件,将图标上代表输出的绿色点连接至filesplit
图标上代表输入的蓝色点
之后点击Submit
,输入Job Name即可提交任务
至此,所有步骤已完成。如有疑问请加入微信群咨询 EML WeChat Group
- 所执行的程序必须有明确的输入或输出,且在运行中无任何形式的交互 -- 例如按下回车或是输入y/n
- 打包时一定要包含所有的依赖包
- 如果选择将结果输出至屏幕,则所有信息无法复原且无法传递,因此推荐输出至文件
- 程序在上传前必须压缩为zip格式的压缩包,注意必须要保证程序的可执行包等包含在一个文件夹中,再对该文件夹进行压缩。具体目录结构可以参照本文给出的三个压缩包
- 如果程序需要访问外部内容,则必须先通过ETL化为内部文件方可使用(目前EML还未添加ETL功能)
-
Installation And Introduction
-
Sample Description
-
Tips For Developers
-
Problem Summary