diff --git a/datasource-webhdfs/README.md b/datasource-webhdfs/README.md new file mode 100644 index 00000000..7158c34a --- /dev/null +++ b/datasource-webhdfs/README.md @@ -0,0 +1,55 @@ +A custom data source to read and write data from and to remote HDFS clusters using the [WebHDFS](https://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) protocol. + +## Linking + +Using SBT: + +```scala +libraryDependencies += "org.apache.bahir" %% "spark-datasource-webhdfs" % "2.1.0-SNAPSHOT" +``` + +Using Maven (Scala version 2.11): + +```xml + + org.apache.bahir + spark-datasource-webhdfs_2.11 + 2.1.0-SNAPSHOT + +``` + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + +```Shell +$ bin/bin/spark-shell --packages org.apache.bahir:spark-datasource-webhdfs_2.11:2.1.0-SNAPSHOT +``` + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.10 and 2.11, and intended to support Spark 2.0 onwards. + +## Examples + +A data frame can be created using this custom data source as shown below - + +```scala +val filePath = s"webhdfs:///gateway/default/webhdfs/v1/" + +val df = spark.read + .format("webhdfs") + .option("certValidation", "Y") + .option("userCred", "user1:pass1") + .option("header", "true") + .option("partitions", "8") + .load(filePath) +``` + +## Configuration options. + + * `certValidation` Set this to `'Y'` or `'N'`. In case of `'N'` this component will ignore validation of teh SSL certification. Otherwise it will download the certificate and validate. + * `userCred` Set this to `'userid:password'` as needed by the remote HDFS for accessing a file from there. + * `partitions` This number tells the Data Source how many parallel connections to be opened to read data from HDFS in the remote cluster for each file. If this option is not specified default value is used which is 4. Recommended value for this option is same as the next nearest integer of (file size/block size) in HDFS or multiples of that. For example if file size in HDFS is 0.95 GB and block size of the file is 128 MB use 8 or multiple of 8 as number of partitions. However, number of partitions should not be more than (or may be little more than) maximum number of parallel tasks possible to spawn in your Spark cluster. + * `format` Format of the file. Right now only 'csv' is supported. If this option is not specified by default 'csv' is assumed. + * `output` Specify either `'LIST'` or `'Data'`. By default, `'Data'` is assumed which returns the actual data in the file. If a folder name is specified then data from all files in that folder would be fetched at once. If `'LIST'` is specified then the files within the folder is listed. diff --git a/datasource-webhdfs/pom.xml b/datasource-webhdfs/pom.xml new file mode 100755 index 00000000..7e2f5ae9 --- /dev/null +++ b/datasource-webhdfs/pom.xml @@ -0,0 +1,63 @@ + + + + + 4.0.0 + + org.apache.bahir + bahir-parent_2.11 + 2.1.0-SNAPSHOT + ../pom.xml + + + org.apache.bahir + spark-datasource-webhdfs_2.11 + + datasource-webhdfs + + jar + Apache Bahir - Spark DataSource WebHDFS + http://bahir.apache.org/ + + + + org.scalaj + scalaj-http_${scala.binary.version} + 2.3.0 + + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-source-plugin + + + + diff --git a/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/BahirWebHdfsDataSetWrapper.scala b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/BahirWebHdfsDataSetWrapper.scala new file mode 100644 index 00000000..8ba35f94 --- /dev/null +++ b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/BahirWebHdfsDataSetWrapper.scala @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.datasource.webhdfs + +import java.io._ +import java.net.URI + +import scala.collection.mutable.HashMap + +import org.apache.bahir.datasource.webhdfs.util.WebHdfsConnector +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.util.Progressable + + +/* + * This class contains functions for reading/writing data from/to remote webhdfs server in Spark + * DataSource + */ +class BahirWebHdfsFileSystem extends FileSystem { + + // TODO: use a logger here + // scalastyle:off println + println(s" - - - ${this.getClass.getSimpleName} loaded - - - ") + // scalastyle:on println + + var uri: URI = null + var rHdfsUri: URI = null + var conf: Configuration = null + var workingDir = null + + var readFullFile = false + var usrCred = "" + var connections = 0 + var certValidation = "Y" + + var fileStatusMap: HashMap[String, FileStatus] = HashMap() + var listStatusMap: HashMap[String, Array[FileStatus]] = HashMap() + + override def getUri(): URI = uri + + + /** + * This method does necessary initialization of the configuration parameters + */ + override def initialize(uriOrg: URI, + confOrg: Configuration): Unit = { + + super.initialize(uriOrg, confOrg) + + setConf(confOrg) + + rHdfsUri = uriOrg + conf = confOrg + + val usrCredStr = conf.get("usrCredStr") + usrCred = if (usrCredStr == null) { + throw new Exception("User Credential Has To Be Specified For The Remote HDFS") + } else usrCredStr.toString + + val certFile = conf.get("certTrustStoreFile") + val certPwd = conf.get("certTrustStorePwd") + + certValidation = if (certFile == null || certPwd == null) "N" else s"$certFile:$certPwd" + + uri = URI.create(uriOrg.getScheme() + "://" + uriOrg.getAuthority()) + +// println(s"BahirWebHdfsFileSystem: uri=${uri}, connections=${connections}, " + +// s"usercred=${usrCred}") + } + + override def getWorkingDirectory(): Path = { + + val path = new Path(rHdfsUri) + // println("Working Directory: " + path) + path + } + + override def setWorkingDirectory(dir: Path): Unit = {} + + override def rename(srcPath: Path, destPath: Path): Boolean = { + + val destPathModStr = Path.getPathWithoutSchemeAndAuthority(destPath).toString. + replace("/gateway/default/webhdfs/v1", "") + WebHdfsConnector. + renameFile(srcPath.toString, destPathModStr, certValidation, "10000:120000", usrCred) + } + + override def delete(srcPath: Path, recursive: Boolean): Boolean = { + WebHdfsConnector. + deleteFile(srcPath.toString, recursive, certValidation, "10000:120000", usrCred) + } + + override def mkdirs(srcPath: Path, permission: FsPermission): Boolean = { + WebHdfsConnector. + makeDirectory(srcPath.toString, permission.toShort, certValidation, "10000:120000", usrCred) + } + + override def append(srcPath: Path, + bufferSize: Int, + progress: Progressable): FSDataOutputStream = { + throw new Exception("File Append Not Supported") + } + + override def getFileStatus(f: Path): FileStatus = { + val file = modifyFilePath(f).toString + var fStatus: FileStatus = fileStatusMap.getOrElse(f.toString, null) + + val fileStatus = if (fStatus == null) { + val fStatusMap = WebHdfsConnector.getFileStatus(file, certValidation, "10000:120000", usrCred) + if (fStatusMap != null) { + fStatus = createFileStatus(f, fStatusMap) + fileStatusMap.put(f.toString, fStatus) + } + fStatus + } + else { + fStatus + } + + // println("In bahir before returning from getFileStatis fileStatus : " + fileStatus) + fileStatus + } + + override def listStatus(f: Path): Array[FileStatus] = { + + val file = modifyFilePath(f).toString + + var lStatus: Array[FileStatus] = listStatusMap.getOrElse(f.toString, null) + + val listStatus = if (lStatus == null) { + val fStatusMapList = WebHdfsConnector + .getListStatus(file, certValidation, "10000:120000", usrCred) + val fileCount = fStatusMapList.length + lStatus = new Array[FileStatus](fileCount) + var i = 0 + while (i < fileCount) { + lStatus(i) = createFileStatus(f, fStatusMapList(i)) + i += 1 + } + listStatusMap.put(f.toString, lStatus) + lStatus + } + else { + lStatus + } + + listStatus + } + + private def createFileStatus(fPath: Path, statusMap: Map[String, Any]): FileStatus = { + + val lng = conf.get("length") + val partlng = if (lng == null) 1 else lng.toInt + + val blk = conf.get("block") + val partblk = if (blk == null) 1 else blk.toInt + + + val isDirFlg = if (statusMap.getOrElse("type", "") == "DIRECTORY") true else false + val pathSuffix = statusMap.getOrElse("pathSuffix", "") + val targetPath = if (pathSuffix == "") fPath else new Path(fPath.toString + "/" + pathSuffix) + val fStatus = new FileStatus( + statusMap.getOrElse("length", 0).asInstanceOf[Double].toLong * partlng, + isDirFlg, + statusMap.getOrElse("replication", 1).asInstanceOf[Double].toInt, + (statusMap.getOrElse("blockSize", 128000000).asInstanceOf[Double].toLong) / partblk, + statusMap.getOrElse("modificationTime", 0).asInstanceOf[Double].toLong, + statusMap.getOrElse("accessTime", 0).asInstanceOf[Double].toLong, + null, + statusMap.getOrElse("owner", "default").asInstanceOf[String], + statusMap.getOrElse("group", "default").asInstanceOf[String], + null, targetPath) + fStatus + } + + private def modifyFilePath(f: Path): Path = { + val wQryStr = f.toString.replace(getQryStrFromFilePath(f), "") + new Path(wQryStr) + } + + private def getQryStrFromFilePath(f: Path): String = { + val fileStr = f.toString + val start = fileStr.indexOf("&") + val end = fileStr.indexOf(";") + + val qryStr = if (start > 0) fileStr.substring(start, end) else "" + + qryStr + } + + override def open(f: Path, bs: Int): FSDataInputStream = { + + val fileStatus = getFileStatus(f) + val blockSize = fileStatus.getBlockSize + val fileLength = fileStatus.getLen + + val file = modifyFilePath(f) + + // print("file uri in open after modification : " + file + "\n") + + val qMap = getQryMapFromFilePath(f) + + val fConnections = if (qMap == null) { + 0 + } + else { + qMap.getOrElse("connections", "0").asInstanceOf[String].toInt + } + + val streamBufferSize = if (qMap == null) { + bs + } + else { + qMap.getOrElse("streamBufferSize", bs.toString).asInstanceOf[String].toInt + } + + val rdBufferSize = if (qMap == null) { + bs + } + else { + qMap.getOrElse("readBufferSize", bs.toString).asInstanceOf[String].toInt + } + + val readBufferSize = if (rdBufferSize <= 0) blockSize else rdBufferSize.toLong + + val fReadFull = if (qMap == null) { + false + } + else { + qMap.getOrElse("readFullFile", false.toString).asInstanceOf[String].toBoolean + } + + val streamFlg = if (qMap == null) { + true + } + else { + qMap.getOrElse("streamFlg", true.toString).asInstanceOf[String].toBoolean + } + + + new FSDataInputStream(new BahirWebHdfsInputStream(file, streamBufferSize, readBufferSize, + blockSize, fileLength, + fReadFull, streamFlg, usrCred, fConnections, certValidation)) + } + + override def create(srcPath: Path, + permission: FsPermission, + overwriteFlg: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long, + progress: Progressable): FSDataOutputStream = { + + val file = modifyFilePath(srcPath) + + // println("file uri in create after modification : " + file) + + new FSDataOutputStream(new BahirWebHdfsOutputStream(file, bufferSize, blockSize, + permission.toShort, replication, overwriteFlg, usrCred, certValidation), null) + } + + private def getQryMapFromFilePath(f: Path): HashMap[String, String] = { + + val qryStr = getQryStrFromFilePath(f) + if (qryStr == "") null + else { + + val params = qryStr.replace(";", "").substring(1).split("&") + + val paramCount = params.length + + // print("params : " + params + " , lenth : " + paramCount + "\n") + var paramMap: HashMap[String, String] = new HashMap() + + var i = 0 + + while (i < paramCount) { + val paramKV = params(i).split("=") + paramMap.put(paramKV(0), paramKV(1)) + i += 1 + } + + // print("param map : " + paramMap + "\n") + paramMap + } + + } + +} + +class BahirWebHdfsInputStream(fPath: Path, + strmBufferSz: Int, + rdBufferSz: Long, + blockSz: Long, + fileSz: Long, + readFull: Boolean, + strmFlg: Boolean, + usrCrd: String, + conns: Int, + certValidation: String) + extends FSInputStream { + + val filePath: Path = fPath + val streamBufferSize: Int = strmBufferSz + val readBufferSize: Long = rdBufferSz + val blockSize: Long = blockSz + val fileSize: Long = fileSz + val readFullFlg: Boolean = readFull + val streamFlg: Boolean = strmFlg + val usrCred: String = usrCrd + val connections: Int = conns + val certValidationFlg: String = certValidation + + var pos = -1L + + var in: InputStream = null + + var callCount = 0 + + /* + * This is a dummy implementation as Spark does not use it. We need it here just to satisfy + * interface contract + */ + override def read(): Int = { + read(new Array[Byte](4056), 0, 100) + } + + override def read(b: Array[Byte], offset: Int, length: Int): Int = { + callCount += 1 + var bCount = in.read(b, offset, length) + + if (bCount < 0 && pos < fileSize) { + seek(pos) + bCount = in.read(b, offset, length) + } + + pos += bCount + bCount + + } + + override def seek(newPos: Long): Unit = { + // print("In seek - newpos : " + newPos + " , old pos : " + pos + "\n") + if (pos != newPos) { + pos = newPos + close + } + createWebHdfsInputStream(pos) + } + + private def createWebHdfsInputStream(pos: Long) = { + + val poe = if (connections == 0) { + if (blockSize > fileSize || readFullFlg == true) { + 0 + } else { + pos + blockSize + 100000 + } + } + else { + pos + fileSize/(connections - 1) + 100000 + } + + if (streamFlg == true) { + val inputStream = WebHdfsConnector + .getFileInputStream(filePath.toString(), streamFlg, readBufferSize, pos, poe, + certValidationFlg, "10000:120000", usrCred) + in = if (streamBufferSize <= 0) inputStream else { + new BufferedInputStream(inputStream, streamBufferSize) + } + } else { + val in = WebHdfsConnector + .getFileInputStream(filePath.toString(), streamFlg, readBufferSize, pos, poe, + certValidationFlg, "10000:120000", usrCred) + } + + } + + /* + * This is a dummy implementation as Spark does not use it. We need it here just to satisy + * interface contract + */ + override def seekToNewSource(targetPos: Long): Boolean = false + + override def getPos(): Long = pos + + override def close() : Unit = { + if (in != null) in.close + } + +} + + +class BahirWebHdfsOutputStream(fPath: Path, + bufferSz: Int, + blockSz: Long, + perms: Short, + replicationCnt: Short, + overwriteFlg: Boolean, + usrCrd: String, + certValidation: String) + extends OutputStream { + + val filePath: Path = fPath + val bufferSize: Int = bufferSz + val blockSize: Long = blockSz + val permission: Short = perms + val replication: Short = replicationCnt + val overwrite: Boolean = overwriteFlg + val usrCred: String = usrCrd + val certValidationFlg: String = certValidation + + + override def write(b: Int): Unit = { + + val singleByte : Array[Byte] = new Array(b)(1) + writeBytes(singleByte) + } + + override def write(b: Array[Byte]): Unit = { + writeBytes(b) + } + + override def write(b: Array[Byte], offset: Int, length: Int): Unit = { + writeBytes(b) + } + + private def writeBytes(b: Array[Byte]): Unit = { + WebHdfsConnector.writeFile(b, filePath.toString, permission, overwriteFlg, bufferSize, + replication, blockSize, certValidation, "10000:120000", usrCred) + } + +} diff --git a/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/package.scala b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/package.scala new file mode 100644 index 00000000..e7ec7a28 --- /dev/null +++ b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.datasource + +/* + * WebHDFS data source for Apache Spark + */ +package object webhdfs { + +} diff --git a/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/util/SSLTrustStoreUtil.scala b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/util/SSLTrustStoreUtil.scala new file mode 100644 index 00000000..fbb7bc9e --- /dev/null +++ b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/util/SSLTrustStoreUtil.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.datasource.webhdfs.util + +import java.io._ + +import scala.collection.mutable.HashMap +import scala.sys.process._ + +/** + * This Singleton is used to generate SSLTrustStore certification once. The assumption behind use of + * this trust store is that this code would be executed on a machine which would be accessible from + * all Spark executors which need to access the trust store + */ +object SSLTrustStore { + + var trustStoreFileMap: HashMap[String, File] = HashMap() + + /** + * This function checks the availability of truststore for a particular site. If not it creates + * a new one. + */ + def getCertDetails(path: String): Tuple2[File, String] = { + + print("path in ssltrust store getCertDetails : " + path + "\n") + + val pathComp = path.split("/") + + val srvr = pathComp(2) + + val trustStorePword = "ts-password" + + val currDir = ("pwd" !!).trim + val trustStore = currDir + "/" + srvr + "_trustStore.jks" + + val os = new java.io.ByteArrayOutputStream + + val tsExist = (s"ls $trustStore" #> os).! + + val f = if (tsExist == 0) { + print("Using Existing Trust Store for SSL" + "\n") + new java.io.File(trustStore) + } + else { + val cert = srvr + "_cert" + val cfl = new File(cert) + + (s"openssl s_client -showcerts -connect $srvr" #| "openssl x509 -outform PEM" #> cfl).! + (s"keytool -import -trustcacerts -alias hadoop -file $cert -keystore $trustStore" + + s" -storepass $trustStorePword -noprompt").! + (s"rm -f $cert").! + new java.io.File(trustStore) + } + + new Tuple2(f, trustStorePword) + } +} diff --git a/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/util/WebHdfsConnector.scala b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/util/WebHdfsConnector.scala new file mode 100644 index 00000000..43cbb71d --- /dev/null +++ b/datasource-webhdfs/src/main/scala/org/apache/bahir/datasource/webhdfs/util/WebHdfsConnector.scala @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.datasource.webhdfs.util + +import java.io._ +import java.net.{HttpURLConnection, URL} +import java.security._ +import javax.net.ssl.{SSLContext, SSLSocketFactory, TrustManagerFactory} + +import scala.annotation.switch + +import scalaj.http.{Http, HttpOptions} + +/** + * This object contains all utility functions for reading/writing data from/to remote webhdfs + * server. The abstraction maintained in this layer is at the level of RDD + */ +object WebHdfsConnector { + + /* + * This function returns a Tuple for credential store which contains flag for validating + * Certificate, the Certificate File object and Certificate File Object password + * the value of the parameter cred is "N" it ignores certificate validation, if + * it is "Y" then it downloads a certificate using openssl, else it expects a valid + * path for trust store and password for using the same + */ + def createTrustStoreCredForExecutors(cred: String, path: String): Tuple3[String, File, String] = { + val trustStoreMap = { + if (cred != "") { + if (cred == "N") { + new Tuple3("N", null, "") + } else if (cred == "Y") { + val tsd = SSLTrustStore.getCertDetails(path) + new Tuple3("Y", tsd._1, tsd._2) + } else { + val tsd = cred.split(":") + new Tuple3("Y", new java.io.File(tsd(0)), tsd(1)) + } + } else { + new Tuple3("", null, "") + } + } + trustStoreMap + } + + + /** + * This function returns a SSLSocketFactory which needs to be used in HTTP connection library in + * case Certificate to be validated + */ + def biocSslSocketFactory(fl: File, pswrd: String): SSLSocketFactory = { + val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) + val ks = KeyStore.getInstance("JKS") + val fis = new java.io.FileInputStream(fl) + ks.load(fis, pswrd.toCharArray()); + tmf.init(ks); + + val sslc = SSLContext.getInstance("SSL") + + sslc.init(null, tmf.getTrustManagers(), null) + + sslc.getSocketFactory() + } + + /** + * This function creates a directory in remote HDFS + */ + def makeDirectory(path: String, + permission: Short, + trustStoreCredStr: String, + connStr: String, + userCredStr: String): Boolean = { + /* + val webHdfsChkDirOpr = "op=GETFILESTATUS" + val returnChkDir = callWebHdfsAPI(path, null, "GET", "CODE", trustStoreCredStr, userCredStr, + connStr, webHdfsChkDirOpr, "String").asInstanceOf[Integer] + */ + + val webHdfsMakeDirOpr = s"op=MKDIRS&permission=$permission" + val returnMakeDir = callWebHdfsAPI(path, "".getBytes(), "PUT", "CODE", + trustStoreCredStr, userCredStr, + connStr, webHdfsMakeDirOpr, "String").asInstanceOf[Integer] + if (returnMakeDir != 200) { + throw new Exception("The Directory could not be created , Src path,Code: " + + path + " , " + returnMakeDir + "\n") + } else true + } + + + /** + * This function deletes a file/directory recursively in remote HDFS + */ + def deleteFile(path: String, + recursiveFlg: Boolean, + trustStoreCredStr: String, + connStr: String, + userCredStr: String): Boolean = { + + /* + val webHdfsChkDirOpr = "op=GETFILESTATUS" + val returnChkDir = callWebHdfsAPI(path, null, "GET", "CODE", trustStoreCredStr, userCredStr, + connStr, webHdfsChkDirOpr, "String").asInstanceOf[Integer] + */ + + val webHdfsDeleteDirOpr = s"op=DELETE&recursive=$recursiveFlg" + val returnDelDir = callWebHdfsAPI(path, null, "DELETE", "CODE", trustStoreCredStr, userCredStr, + connStr, webHdfsDeleteDirOpr, "String").asInstanceOf[Integer] + if (returnDelDir != 200) { + throw new Exception("The File/Directory could not be renamed , Src path,Dest path,Code: " + + path + " , " + returnDelDir + "\n") + } else true + } + + /** + * This function writes 1 file in remote HDFS + */ + def writeFile(data: Array[Byte], + path: String, + permission: Short, + overwriteFlg: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long, + trustStoreCredStr: String, + connStr: String, + userCredStr: String): Boolean = { + + /* + val webHdfsChkFileOpr = "op=GETFILESTATUS" + val returnChkFile = callWebHdfsAPI(path, null, "GET", "CODE", trustStoreCredStr, userCredStr, + connStr, webHdfsChkFileOpr, "String").asInstanceOf[Integer] + + val retCode = if (returnChkFile == 200) { + // print("in writeFile, file exists : createdCode : " + returnChkFile + "\n") + true + } else { + */ + + val webHdfsCreateOpr = s"op=CREATE&overwrite=$overwriteFlg&blockSize=$blockSize" + + s"&replication=$replication&bufferSize=$bufferSize" + + s"&permission=$permission" + val createUrl = callWebHdfsAPI(path, "".getBytes(), "PUT", "LOCATION", + trustStoreCredStr, userCredStr, + connStr, webHdfsCreateOpr, "String").asInstanceOf[String] + val createdCode = callWebHdfsAPI(createUrl, data, "PUT", "CODE", trustStoreCredStr, + userCredStr, connStr, webHdfsCreateOpr, "String").asInstanceOf[Integer] + + // print("in writeFile, creaedCode : " + createdCode + "\n") + + true + /* + } + + retCode + */ + + } + + /** + * This function renames 1 file in remote HDFS + */ + def renameFile(path: String, + destPath: String, + trustStoreCredStr: String, + connStr: String, + userCredStr: String): Boolean = { + /* + val webHdfsChkFileOpr = "op=GETFILESTATUS" + val returnChkFile = callWebHdfsAPI(path, null, "GET", "CODE", trustStoreCredStr, userCredStr, + connStr, webHdfsChkFileOpr, "String").asInstanceOf[Integer] + */ + + val webHdfsRenameOpr = s"op=RENAME&destination=$destPath" + val returnRename = callWebHdfsAPI(path, "".getBytes(), "PUT", "CODE", + trustStoreCredStr, userCredStr, + connStr, webHdfsRenameOpr, "String").asInstanceOf[Integer] + if (returnRename != 200) { + throw new Exception("The File/Directory could not be renamed , Src path,Dest path,Code: " + + path + " , " + destPath + " , " + returnRename + "\n") + } + else { + true + } + } + + + + def callWebHdfsAPI(path: String, + data: Array[Byte], + method: String, + respType: String, + trustStoreCredStr: String, + usrCredStr: String, + connStr: String, + opr: String, + outputType: String): Any = { + + // print("path in callWebHdfs : " + path + " , opr : " + opr + "\n") + + val pathComp = path.split(":") + + val trustCred = createTrustStoreCredForExecutors(trustStoreCredStr, path) + + val conns = connStr.split(":") + + val connProp = Array(conns(0).toInt, conns(1).toInt) + + val usrCred = usrCredStr.split(":") + + val uri = (if (trustCred._1 != "") "https:" else "http:") + pathComp(1) + ":" + pathComp(2) + + "?" + opr + + var httpc = Http(uri).auth(usrCred(0), usrCred(1)).timeout(connTimeoutMs = connProp(0), + readTimeoutMs = connProp(1)) + + httpc = (method: @switch) match { + case "GET" => httpc + case "PUT" => httpc.put(data).header("content-type", "application/bahir-webhdfs") + case "DELETE" => httpc.method("DELETE") + case "POST" => httpc.postData(data).header("content-type", "application/bahir-webhdfs") + } + + httpc = (trustCred._1: @switch) match { + case "" => httpc + case "N" => httpc.option(HttpOptions.allowUnsafeSSL) + case "Y" => httpc.option(HttpOptions.sslSocketFactory(biocSslSocketFactory(trustCred._2, + trustCred._3))) + } + + val resp = (respType : @switch) match { + case "BODY" => httpc.asString.body + case "BODY-BYTES" => httpc.asBytes.body + case "BODY-STREAM" => getBodyStream(httpc) + case "CODE" => httpc.asString.code + case "HEADERS" => httpc.asString.headers + case "LOCATION" => httpc.asString.location.mkString(" ") + } + + resp + } + + private def getBodyStream(httpReq: scalaj.http.HttpRequest) : InputStream = { + + val conn = (new URL(httpReq.urlBuilder(httpReq))).openConnection.asInstanceOf[HttpURLConnection] + + HttpOptions.method(httpReq.method)(conn) + + httpReq.headers.reverse.foreach{ case (name, value) => + conn.setRequestProperty(name, value) + } + + httpReq.options.reverse.foreach(_(conn)) + + httpReq.connectFunc(httpReq, conn) + + conn.getInputStream + + } + + def getFileInputStream(filePath: String, + streamFlg: Boolean, + bufferSize: Long, + offset: Long, + end: Long, + trustStoreCredStr: String, + connStr: String, + usrCredStr: String): InputStream = { + + // print("path in getFileInputStream : " + filePath + " , bufferSize : " + bufferSize + "\n") + + var length = 0L + + val fileGetOpr = if (end > 0) { + length = end - offset + s"op=OPEN&offset=$offset&length=$length&bufferSize=$bufferSize" + } else { + s"op=OPEN&offset=$offset&bufferSize=$bufferSize" + } + + val getUrl = callWebHdfsAPI(filePath, null, "GET", "LOCATION", trustStoreCredStr, usrCredStr, + connStr, fileGetOpr, "String").asInstanceOf[String] + + if (streamFlg == true) { + callWebHdfsAPI(getUrl, null, "GET", "BODY-STREAM", + trustStoreCredStr, usrCredStr, + connStr, fileGetOpr, "Stream").asInstanceOf[InputStream] + } else { + val content = callWebHdfsAPI(getUrl, null, "GET", "BODY-BYTES", + trustStoreCredStr, usrCredStr, + connStr, fileGetOpr, "Stream").asInstanceOf[Array[Byte]] + new ByteArrayInputStream(content) + } + + } + + def getFileStatus(filePath: String, + trustStoreCredStr: String, + connStr: String, + usrCredStr: String): Map[String, Any] = { + + val fileStatusOpr = s"op=GETFILESTATUS" + val returnChk = callWebHdfsAPI(filePath, null, "GET", "CODE", trustStoreCredStr, usrCredStr, + connStr, fileStatusOpr, "String").asInstanceOf[Integer] + // print("after file status check in getFileStatus in WebHdfsConnector : " + returnChk + "\n") + + if (returnChk == 200) { + + // print("within return code 200 in getFileStatus : " + returnChk + "\n") + val fileStatus = callWebHdfsAPI(filePath, null, "GET", "BODY", trustStoreCredStr, usrCredStr, + connStr, fileStatusOpr, "String").asInstanceOf[String] + + if (fileStatus.contains("RemoteException")) { + null + } + else { + val responseMap = scala.util.parsing.json.JSON.parseFull(fileStatus).toList(0) + .asInstanceOf[Map[String, Map[String, Any]]] + + responseMap.getOrElse("FileStatus", null) + } + } + else { + null + } + } + + def getListStatus(filePath: String, + trustStoreCredStr: String, + connStr: String, + usrCredStr: String): List[Map[String, Any]] = { + val listStatusOpr = s"op=LISTSTATUS" + + val listStatus = callWebHdfsAPI(filePath, null, "GET", "BODY", trustStoreCredStr, usrCredStr, + connStr, listStatusOpr, "String").asInstanceOf[String] + + if (listStatus.contains("RemoteException")) { + throw new Exception(listStatus) + } + + scala.util.parsing.json.JSON.parseFull(listStatus).toList(0) + .asInstanceOf[Map[String, Map[String, Any]]].get("FileStatuses").get("FileStatus") + .asInstanceOf[List[Map[String, Any]]] + } + +} diff --git a/pom.xml b/pom.xml index a7ae91e4..c6ed89ea 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ sql-streaming-mqtt streaming-twitter streaming-zeromq + datasource-webhdfs @@ -446,6 +447,7 @@ .classpath .project **/dependency-reduced-pom.xml + **/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister **/target/** **/README.md **/examples/data/*.txt