1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
| package com.jd.union.spark
import java.io.IOException import java.security.MessageDigest import java.text.SimpleDateFormat import java.util import java.util._
import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.{ConnectionFactory, HBaseAdmin, _} import org.apache.hadoop.hbase.util.Bytes
object SkuDataToHBaseJob extends Serializable { val dateFormat = new SimpleDateFormat("yyyyMMdd")
var conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "192.168.2.100") conf.set("hbase.zookeeper.property.clientPort", "2181") val conn = ConnectionFactory.createConnection(conf)
def main(args: Array[String]): Unit = { val skuId = "129399424324" val orKey = dateFormat.format(new Date()) + "$" + skuId val rowkey = md5(orKey) val data: String = " {\"abParam\":\"nature\",\"abVersion\":\"comm\",\"adowner\":\"p_10026704\",\"bestCouponId\":\"92737319\",\"brandCode\":382820,\"brandName\":\"谷邦(GUBANG)\",\"cid1\":1620,\"cid1Name\":\"家居日用\",\"cid2\":13780,\"cid2Name\":\"收纳用品\",\"cid3\":13785,\"cid3Name\":\"收纳架/篮\",\"color\":\"2个装北欧粉\",\"comments\":11,\"couponId\":0,\"couponLink\":\"\",\"createTime\":\"2019-01-22T05:44:42.969Z\",\"deliveryType\":0,\"endTime\":\"2999-01-01 23:59:59\",\"goodComments\":11,\"goodCommentsShare\":100,\"hasCoupon\":1,\"imageUrl\":\"jfs/t1/10013/33/7229/24225/5c286dfaEacd0c6a2/f16290038d59eea6.jpg\",\"imgList\":\"jfs/t1/8948/19/11086/256803/5c286df9E594f189f/5a2af0eeda043878.jpg|jfs/t1/23576/23/3598/256769/5c286df9E1e32d799/62ced8312a144260.jpg|jfs/t1/10013/33/7229/24225/5c286dfaEacd0c6a2/f16290038d59eea6.jpg|jfs/t1/26566/34/4201/410351/5c2f5aa6Eb441c098/fc86afded2d6334d.png|jfs/t1/20665/7/3549/279989/5c286dfaE5a253816/1dfab92f62045e00.jpg|jfs/t1/20297/14/3513/296706/5c286df4Efecc5d55/971d9dad36ea6788.jpg\",\"inOrderComm30Days\":129.07,\"inOrderComm30DaysSku\":84.35,\"inOrderCount30Days\":58,\"inOrderCount30DaysSku\":36,\"isCare\":0,\"isHot\":1,\"isLock\":0,\"isNew\":0,\"isOversea\":0,\"isPinGou\":1,\"isSeckill\":0,\"isStuPrice\":0,\"itemTag\":0,\"lowestPrice\":9.90,\"majorSuppBrevityCode\":\"\",\"orientationFlag\":0,\"owner\":\"p\",\"pcCommission\":5.97,\"pcCommissionShare\":30.0,\"pcPrice\":19.90,\"pid\":40520301670,\"pingouActiveId\":15461534722120,\"pingouEnd\":\"2037-08-16 09:30:48\",\"pingouPrice\":9.90,\"pingouStart\":\"2018-12-30 15:05:02\",\"pingouTmCount\":2,\"planId\":1507475989,\"qualityScore\":0.0631,\"qualityScoreNew\":0.4147,\"rfId\":0,\"ruleType\":7,\"seckillOriPrice\":0.0,\"seckillPrice\":0.0,\"shelvesTm\":\"2018-12-30 15:04:32.0\",\"shopId\":863648,\"size\":\"\",\"skuId\":40520301670,\"skuName\":\"谷邦-两个装卫生间牙刷架壁挂式免打孔浴室置物吸盘梳子筒座牙膏杯吸壁收纳盒 2个装北欧粉\",\"startTime\":\"2019-01-11 00:00:00\",\"stuPrice\":0.0,\"themeFlag\":2,\"unionCoupon\":[{\"batchId\":92737319,\"beginTime\":\"2019-01-10 14:22:00\",\"couponKind\":3,\"couponType\":1,\"createTime\":\"2019-01-18T09:50:26.589Z\",\"createUser\":\"lisen43\",\"discount\":8.0,\"endTime\":\"2019-01-31 14:23:00\",\"expireType\":5,\"key\":\"3172a45b3f8242d2ba92141a7c26b66d\",\"link\":\"http://coupon.m.jd.com/coupons/show.action?key=3172a45b3f8242d2ba92141a7c26b66d&roleId=17184533&to=mall.jd.com/index-863648.html\",\"ownerType\":\"92,95,444,498\",\"platformType\":0,\"quota\":17.0,\"remainCnt\":19965,\"source\":3,\"updateTime\":\"2019-01-18T09:50:26.589Z\",\"useEndTime\":\"2019-01-31 23:59:59\",\"useStartTime\":\"2019-01-10 00:00:00\",\"venderId\":10026704}],\"updateTime\":\"2019-01-22T15:01:31.194Z\",\"venderName\":\"谷邦家居拼购店\",\"vid\":10026704,\"wareId\":12567523910,\"wlCommission\":5.97,\"wlCommissionShare\":30.0,\"wlPrice\":19.90}" val tableName = "union_search_sku" try { addData(rowkey, tableName, "cf", "value", data) get(rowkey, tableName) println("scan----------------") scan(tableName) println("创建表成功") } catch { case e: Exception => { e.printStackTrace() null } } finally { } }
def md5(source: String): String = { val sb = new StringBuffer(32) try { val md = MessageDigest.getInstance("MD5") val bytes = md.digest(source.getBytes("utf-8"))
for (i <- 0 to bytes.length) { sb.append(Integer.toHexString((bytes.apply(i) & 0xFF) | 0x100).toLowerCase().substring(1, 3)) } } catch { case e: Exception => null } sb.toString() }
@throws[IOException] def addData(rowKey: String, tableName: String, cf: String, column: String, value: String): Unit = { val put = new Put(Bytes.toBytes(rowKey)) println("开始创建table") val table = conn.getTable(TableName.valueOf(tableName)) println("创建结束") put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value)) table.put(put) System.out.println("add data Success!") }
@throws[IOException] def get(rowKey: String, tableName: String): Unit = { val get = new Get(Bytes.toBytes(rowKey)) val table = conn.getTable(TableName.valueOf(tableName)) val result: Result = table.get(get) val it = result.listCells().iterator() while (it.hasNext) { val cell: Cell = it.next() cell.getValueArray println("value:" + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)) println("column familly:" + Bytes.toString(cell.getValueArray, cell.getFamilyOffset, cell.getFamilyLength)) println("Qualifier:" + Bytes.toString(cell.getValueArray, cell.getQualifierOffset, cell.getQualifierLength)) } }
def scan(tableName: String): Unit = { val scan = new Scan() var rs: ResultScanner = null val table = conn.getTable(TableName.valueOf(tableName)) try { rs = table.getScanner(scan) val it = rs.iterator() while (it.hasNext) { val cellIt = it.next().listCells().iterator() while (cellIt.hasNext) { val kv = cellIt.next() println("value:" + Bytes.toString(kv.getValueArray, kv.getValueOffset, kv.getValueLength)) println("column familly:" + Bytes.toString(kv.getValueArray, kv.getFamilyOffset, kv.getFamilyLength)) println("Qualifier:" + Bytes.toString(kv.getValueArray, kv.getQualifierOffset, kv.getQualifierLength)) System.out.println("timestamp:" + kv.getTimestamp) System.out.println("-------------------------------------------") }
} } finally rs.close()
}
def createTable(tableName: String): Unit = { val admin = conn.getAdmin.asInstanceOf[HBaseAdmin]
val tn = TableName.valueOf(tableName) val descriptor: HTableDescriptor = new HTableDescriptor(tn) val cf = new HColumnDescriptor("cf") descriptor.addFamily(cf) admin.createTable(descriptor) admin.close() }
def del(tableName: String, rowkey: String): Unit = { val table = conn.getTable(TableName.valueOf(tableName)) val del = new Delete(Bytes.toBytes(rowkey)) table.delete(del)
table.close() println("删除成功") }
def deleteTimeRange(tableName: String, minTime: Long, maxTime: Long) {
var table = conn.getTable(TableName.valueOf(tableName)) try { val scan = new Scan() scan.setTimeRange(minTime, maxTime)
val rs = table.getScanner(scan)
val list = getDeleteList(rs) if (list.size() > 0) { table.delete(list) } } catch { case e: Exception => e.printStackTrace() } finally { table.close() } }
def getDeleteList(rs: ResultScanner): util.List[Delete] = {
val list = new util.ArrayList[Delete]() try {
for (r: Result <- rs) { val d = new Delete(r.getRow()) list.add(d) } } finally { rs.close() } list }
}
|