客快物流大数据项目(六十二):主题及指标开发 common包下定义的一些内容 一般有用 看1

news/发布时间2024/5/19 21:30:23

主题及指标开发

一、主题开发业务流程

二、离线模块初始化

1、创建包结构

2、​​​​​​​创建时间处理工具

3、​​​​​​​定义主题宽表及指标结果表的表名

4、​​​​​​​物流字典码表数据类型定义枚举类

5、​​​​​​​封装公共接口

主题及指标开发

一、主题开发业务流程

二、​​​​​​​离线模块初始化

1、​​​​​​​创建包结构

本次项目采用scala编程语言,因此创建scala目录

包名

说明

cn.it.logistics.offline

离线指标统计程序所在包

cn.it.logistics.offline.dwd

离线指标dwd层程序所在包

cn.it.logistics.offline.dws

离线指标dws层程序所在包

2、​​​​​​​创建时间处理工具

实现步骤:

  • 公共模块scala目录下的common程序包下创建DateHelper对象
    • 实现获取当前日期
    • 实现获取昨天日期
package cn.it.logistics.commonimport java.text.SimpleDateFormat
import java.util.Date/*** 时间处理工具类*/
object DateHelper {/*** 返回昨天的时间*/def getyesterday(format:String)={//当前时间减去一天(昨天时间)new SimpleDateFormat(format).format(new Date(System.currentTimeMillis() - 1000 * 60 * 60 * 24))}/*** 返回今天的时间* @param format*/def gettoday(format:String) = {//获取指定格式的当前时间new SimpleDateFormat(format).format(new Date)}
}

3、​​​​​​​定义主题宽表及指标结果表的表名

每个主题都需要拉宽操作将拉宽后的数据存储到kudu表中,同时指标计算的数据最终也需要落地到kudu表,因此提前将各个主题相关表名定义出来

实现步骤:

  • 公共模块scala目录下的common程序包下创建OfflineTableDefine单例对象
  • 定义各个主题相关的表名

参考代码:

package cn.it.logistics.common/*** 自定义离线计算结果表*/
object OfflineTableDefine {//快递单明细表val expressBillDetail  = "tbl_express_bill_detail"//快递单指标结果表val expressBillSummary = "tbl_express_bill_summary"//运单明细表val wayBillDetail = "tbl_waybill_detail"//运单指标结果表val wayBillSummary = "tbl_waybill_summary"//仓库明细表val wareHouseDetail = "tbl_warehouse_detail"//仓库指标结果表val wareHouseSummary = "tbl_warehouse_summary"//网点车辆明细表val dotTransportToolDetail = "tbl_dot_transport_tool_detail"//仓库车辆明细表val warehouseTransportToolDetail = "tbl_warehouse_transport_tool_detail"//网点车辆指标结果表val ttDotSummary = "tbl_dot_transport_tool_summary"//仓库车辆指标结果表val ttWsSummary = "tbl_warehouse_transport_tool_summary"//客户明细表数据val customerDetail = "tbl_customer_detail"//客户指标结果表数据val customerSummery = "tbl_customer_summary"
}

4、​​​​​​​物流字典码表数据类型定义枚举类

为了后续使用方便且易于维护,根据物流字典表的数据类型定义成枚举工具类,物流字典表的数据如下:

来自:tbl_codes表

name

type

注册渠道

1

揽件状态

2

派件状态

3

快递员状态

4

地址类型

5

网点状态

6

员工状态

7

是否保价

8

运输工具类型

9

运输工具状态

10

仓库类型

11

是否租赁

12

货架状态

13

回执单状态

14

出入库类型

15

客户类型

16

下单终端类型

17

下单渠道类型

18

实现步骤:

  • 公共模块scala目录下的common程序包下创建CodeTypeMapping对象
  • 根据物流字典表数据类型定义属性

实现过程:

  • 公共模块scala目录下的common程序包下创建CodeTypeMapping对象
  • 根据物流字典表数据类型定义属性
package cn.it.logistics.common/*** 定义物流字典编码类型映射工具类*/
class CodeTypeMapping {//注册渠道val RegisterChannel = 1//揽件状态val CollectStatus = 2//派件状态val DispatchStatus = 3//快递员状态val CourierStatus = 4//地址类型val AddressType = 5//网点状态val DotStatus = 6//员工状态val StaffStatus = 7//是否保价val IsInsured = 8//运输工具类型val TransportType = 9//运输工具状态val TransportStatus = 10//仓库类型val WareHouseType = 11//是否租赁val IsRent = 12//货架状态val GoodsShelvesStatue = 13//回执单状态val ReceiptStatus = 14//出入库类型val WarehousingType = 15//客户类型val CustomType = 16//下单终端类型val OrderTerminalType = 17//下单渠道类型val OrderChannelType = 18
}
object CodeTypeMapping extends CodeTypeMapping{
}

5、​​​​​​​封装公共接口

根据分析:主题开发数据的来源都是来自于kudu数据库,将数据进行拉宽或者将计算好的指标最终需要写入到kudu表中,因此根据以上流程抽象出来公共接口

实现步骤:

  • offline目录下创建OfflineApp单例对象
    • 定义数据的读取方法:getKuduSource
    • 定义数据的处理方法:execute
    • 定义数据的存储方法:save

参考代码:

package cn.it.logistics.offlineimport cn.it.logistics.common.{Configuration, DateHelper, Tools}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, date_format}/*** 根据不同的主题开发定义抽象方法* 1)数据读取* 2)数据处理* 3)数据保存*/
trait OfflineApp {/*** 读取kudu表的数据* @param sparkSession* @param tableName* @param isLoadFullData*/def getKuduSource(sparkSession: SparkSession, tableName:String, isLoadFullData:Boolean = false)= {if (isLoadFullData) {//加载全部的数据sparkSession.read.format(Configuration.SPARK_KUDU_FORMAT).options(Map("kudu.master" -> Configuration.kuduRpcAddress,"kudu.table" -> tableName,"kudu.socketReadTimeoutMs"-> "60000")).load().toDF()} else {//加载增量数据sparkSession.read.format(Configuration.SPARK_KUDU_FORMAT).options(Map("kudu.master" -> Configuration.kuduRpcAddress,"kudu.table" -> tableName,"kudu.socketReadTimeoutMs"-> "60000")).load().where(date_format(col("cdt"), "yyyyMMdd") === DateHelper.getyesterday("yyyyMMdd")).toDF()}}/*** 数据处理* @param sparkSession*/def execute(sparkSession: SparkSession)/*** 数据存储* dwd及dws层的数据都是需要写入到kudu数据库中,写入逻辑相同* @param dataFrame* @param isAutoCreateTable*/def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true): Unit = {//允许自动创建表if (isAutoCreateTable) {Tools.autoCreateKuduTable(tableName, dataFrame)}//将数据写入到kudu中dataFrame.write.format(Configuration.SPARK_KUDU_FORMAT).options(Map("kudu.master" -> Configuration.kuduRpcAddress,"kudu.table" -> tableName)).mode(SaveMode.Append).save()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ulsteruni.cn/article/08876167.html

如若内容造成侵权/违法违规/事实不符,请联系编程大学网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

Sql Server设置用户只能查看并访问特定数据库

1.新建登录用户以管理员身份登陆数据库(权限最高的身份如sa),点击安全性->登录名,右键新建登录名,输入登录名和密码,取消强制实施密码策略。 2.将服务器角色设置为public 3.将public服务器角色的属性->取消查看所有数据库的权限点击安全性->服务器角色->publ…

KingbaseES V8R6集群运维案例之---级联备库upstream节点故障

KingbaseES V8R6集群运维案例之---级联备库upstream节点故障案例说明: 在KingbaseES V8R6集群,构建级联备库后,在其upstream的节点故障后,级联备库如何处理? 适用版本:KingbaseES V8R6 集群架构:案例一: 一、配置集群的recovery参数(all nodes) Tips: 关闭备库的aut…

基础优化方法

梯度下降梯度是一个向量(矢量),表示某一函数在该点处的方向导数沿着该方向取得最大值,即函数在该点处沿着该方向(此梯度的方向)变化最快,变化率最大(为该梯度的模)。梯度在物理学、机器学习和数学优化等领域有着广泛的应用。挑选一个初始值\(w_0\) 重复迭代参数t=1,2,…

第十次作业-20230917

一、在kubernetes 部署Redis Cluster并基于存储类实现数据持久化 1.1 准备 nfs 后端存储 root@k8s-ha1-238:~# mkdir -pv /data/k8sdata/myserver/{redis0,redis1,redis2,redis3,redis4,redis5} mkdir: created directory /data/k8sdata/myserver/redis0 mkdir: created direct…

springboot3接入nacos

参考:https://blog.csdn.net/qinguan111/article/details/132877842(连接不上nacos) https://verytoolz.com/yaml-formatter.html(yaml格式工具) 好吧,从昨天下午到今天快上午一直在被接入nacos这个问题拦在这, 1.一开始我就直接搜的springboot如何接入nacos build.grad…

el-table中实现列勾选和行勾选

功能展示: el-table可以整列勾选,整行勾选,整行和整列勾选,全选取消,单个勾选主要应用了el-table-column中的render-header方法,手动控制勾选状态 其中每行中的itemCheck${type},checked,isIndeterminate,以及 data中的 isCheck${type}, isIndeterminate${type}都是辅助参…