Details
Description
I am facing a major issue on replacement of Synonyms in my DataSet.
I am trying to replace the synonym of the Brand names to its equivalent names.
I have tried 2 methods to solve this issue.
Method 1 (regexp_replace)
Here i am using the regexp_replace method.
Hashtable manufacturerNames = new Hashtable();
Enumeration names;
String str;
double bal;
manufacturerNames.put("Allen","Apex Tool Group");
manufacturerNames.put("Armstrong","Apex Tool Group");
manufacturerNames.put("Campbell","Apex Tool Group");
manufacturerNames.put("Lubriplate","Apex Tool Group");
manufacturerNames.put("Delta","Apex Tool Group");
manufacturerNames.put("Gearwrench","Apex Tool Group");
manufacturerNames.put("H.K. Porter","Apex Tool Group");
/....100 MORE..../
manufacturerNames.put("Stanco","Stanco Mfg");
manufacturerNames.put("Stanco","Stanco Mfg");
manufacturerNames.put("Standard Safety","Standard Safety Equipment Company");
manufacturerNames.put("Standard Safety","Standard Safety Equipment Company");
// Show all balances in hash table.
names = manufacturerNames.keys();
Dataset<Row> dataFileContent = sqlContext.load("com.databricks.spark.csv", options);
while(names.hasMoreElements())
{ str = (String) names.nextElement(); dataFileContent=dataFileContent.withColumn("ManufacturerSource", regexp_replace(col("ManufacturerSource"),str,manufacturerNames.get(str).toString())); }
dataFileContent.show();
I got to know that the amount of data is too huge for regexp_replace so got a solution to use UDF
http://stackoverflow.com/questions/43413513/issue-in-regex-replace-in-apache-spark-java
Method 2 (UDF)
List<Row> data2 = Arrays.asList(
RowFactory.create("Allen", "Apex Tool Group"),
RowFactory.create("Armstrong","Apex Tool Group"),
RowFactory.create("DeWALT","StanleyBlack")
);
StructType schema2 = new StructType(new StructField[]
{ new StructField("label2", DataTypes.StringType, false, Metadata.empty()), new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) });
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() {
private static final long serialVersionUID = -5239951370238629896L;
@Override
public Boolean call(String t1, String t2) throws Exception
};
spark.udf().register("contains", contains, DataTypes.BooleanType);
UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() {
private static final long serialVersionUID = -2882956931420910207L;
@Override
public String call(String t1, String t2, String t3) throws Exception
};
spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);
Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
.withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
.select(col("sentence_replaced"));
joined.show(false);
}
Got this output when there are multiple replacements do in a row.
Input-
Allen Armstrong jeevi pramod Allen
sandesh Armstrong jeevi
harsha nischay DeWALT
Output-
Apex Tool Group Armstrong jeevi pramod Apex Tool Group
Allen Apex Tool Group jeevi pramod Allen
sandesh Apex Tool Group jeevi
harsha nischay StanleyBlack
Expected Output-
Apex Tool Group Apex Tool Group jeevi pramod Apex Tool Group
sandesh Apex Tool Group jeevi
harsha nischay StanleyBlack
Are there any other method which must be followed to get the proper output.? Or is this is limitation of UDF ?
Kindly help us with this issue.