Skip to main content

Big Data Tips

CentOS

查看 CentOS 版本

cat /etc/redhat-release

修改静态IP

vim /etc/sysconfig/network-scripts/ifcfg-ens33
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.10.102
PREFIX=24
GATEWAY=192.168.10.2
DNS1=192.168.10.2

修改主机名

vim /etc/hostname
hadoop101

配置主机映射

vim /etc/hosts
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104

Zookeeper

启动客户端

bin/zkCli.sh -server 192.168.10.101:2181

查看当前znode中所包含的内容

[zk: localhost:2181(CONNECTED) 1] ls /

查看当前节点详细数据

[zk: localhost:2181(CONNECTED) 1] ls -s /

Hadoop

查看内存使用

jps
jmap -heap 2611

手动启动

hdfs --daemon start namenode
hdfs --daemon start datanode

批量关闭进程

kill -9 `jps | grep Sqoop | awk '{print $1}'`

查看 HA 选举

[zk: localhost:2181(CONNECTED) 1] get -s /hadoop-ha/marmot_cluster/ActiveStandbyElectorLock
[zk: localhost:2181(CONNECTED) 1] get -s /yarn-leader-election/cluster-yarn1/ActiveStandbyElectorLock

Atlas

Hive 元数据初次导入

/opt/module/atlas/hook-bin/importhive.sh

Docker

导出导入镜像

保存:

docker save gethue/hue:latest -o hue.latest.tar

加载:

docker load -i hue.latest.tar

Sqoop

导入到 HDFS

sqoop import \
--connect "jdbc:mysql://172.28.252.106:3306/gmall" \
--username root \
--password root \
--table order_info \
--target-dir /origin_data/gmall/db/order_info/2023-04-30 \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
tip

--num-mappers 指定 map task 数量,默认为 4 ,当不指定或者 --num-mappers 大于 1 时,需要指定 --split-by 参数。

导入到 Hive

sqoop import \
--connect "jdbc:mysql://172.28.252.106:3306/gmall" \
--username root \
--password root \
--table order_info \
--num-mappers 1 \
--fields-terminated-by '\t' \
--hive-import \
--hive-overwrite \
--hive-database db_hive1 \
--hive-table ods_order_info \
--hive-partition-key dt \
--hive-partition-value 2023-05-19 \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'

Yarn 杀死任务

yarn application -kill application_1698800224241_0753

Hive

创建表

DROP TABLE IF EXISTS ods_order_info;
CREATE EXTERNAL TABLE ods_order_info (
`id` string COMMENT '订单号',
`final_total_amount` decimal(16,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`province_id` string COMMENT '省份ID',
`benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额',
`original_total_amount` decimal(16,2) COMMENT '原价金额',
`feight_fee` decimal(16,2) COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (`dt` string)
-- 按照时间创建分区
ROW format delimited fields terminated BY '\t'
-- 指定分割符为\t
STORED AS
-- 指定存储方式,读数据采用LzoTextInputFormat;输出数据采用TextOutputFormat
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_info/';

DROP TABLE IF EXISTS dwd_order_info;
CREATE EXTERNAL TABLE dwd_order_info (
`id` string COMMENT '订单号',
`final_total_amount` decimal(16,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`province_id` string COMMENT '省份ID',
`benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额',
`original_total_amount` decimal(16,2) COMMENT '原价金额',
`feight_fee` decimal(16,2) COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_order_info/'
tblproperties ("parquet.compression"="lzo");

插入表

INSERT
overwrite table dwd_order_info partition(dt = '2023-03-07')
select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
FROM
ods_order_info;

Kafka

Topics

bin/kafka-topics.sh
参数描述
--bootstrap-server <String: server toconnect to>连接的Kafka Broker 主机名称和端口号。
--topic <String: topic>操作的topic 名称。
--create创建主题。
--delete删除主题。
--alter修改主题。
--list查看所有主题。
--describe查看主题详细描述。
--partitions <Integer: # of partitions>设置分区数。
--replication-factor<Integer: replication factor>设置分区副本。
--config <String: name=value>更新系统默认的配置。

查看所有 Topic

bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --list

创建 Topic

bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic test

查看 Topic 详情

bin/kafka-topics.sh --bootstrap-server hadoop101:9092 describe topic test

删除 Topic

bin/kafka-topics.sh --bootstrap-server hadoop101:9092 delete topic test

Producer

参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号 。
--topic <String: topic>操作的 Topic 名称 。

发送消息

bin/kafka-console-producer.sh --bootstrap-server hadoop101:9092 topic test
>hello world

Consumer

参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号 。
--topic <String: topic>操作的 Topic 名称 。

消费数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic test

Kafka Connect

API

GET /connectors // 返回所有正在运行的connector名。
GET /connector-plugins // 返回支持的connector插件。
POST /connectors // 新建一个connector。
GET /connectors/{name} // 获取指定connetor的信息。
GET /connectors/{name}/config // 获取指定connector的配置信息。
PUT /connectors/{name}/config // 更新指定connector的配置信息。
GET /connectors/{name}/status // 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks // 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status // 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause // 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume // 恢复一个被暂停的connector。
POST /connectors/{name}/restart // 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart // 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} // 删除一个connector,停止它的所有task并删除配置。

示例

创建 Connector

{
"name": "inventory-connector", - 1
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", - 2
"database.hostname": "192.168.99.100", - 3
"database.port": "1433", - 4
"database.user": "sa", - 5
"database.password": "Password!", - 6
"database.dbname": "testDB", - 7
"database.server.name": "fullfillment", - 8
"table.include.list": "dbo.customers", - 9
"database.history.kafka.bootstrap.servers": "kafka:9092", - 10
"database.history.kafka.topic": "dbhistory.fullfillment" - 11
}
}
  1. The name of our connector when we register it with a Kafka Connect service.
  2. The name of this SQL Server connector class.
  3. The address of the SQL Server instance.
  4. The port number of the SQL Server instance.
  5. The name of the SQL Server user.
  6. The password for the SQL Server user.
  7. The name of the database to capture changes from.
  8. Topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
  9. A list of all tables whose changes Debezium should capture.
  10. The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
  11. The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.

详情可参考 sqlserver-example-configuration

Sql Server CDC

数据库设置 CDC

开启

EXEC sys.sp_cdc_enable_db GO

查询是否启用

select * from sys.databases where is_cdc_enabled = 1

关闭

exec sys.sp_cdc_disable_db

表设置 CDC

exec sp_cdc_enable_table
@source_schema='dbo',
@source_name='people',
@role_name=null;

确认是否有权限访问 CDC 表

EXEC sys.sp_cdc_help_change_data_capture;

启动SQL Server Agent服务

使用管理员打开 cmd

net start SQLSERVERAGENT

确认SQL Server Agent已开启

EXEC sys.sp_cdc_help_change_data_capture

Sql Client

bin/yarn-session.sh -d
tip

In order to stop Flink gracefully, use the following command:

$ echo "stop" | ./bin/yarn-session.sh -id application_1686796443149_0001

If this should not be possible, then you can also kill Flink via YARN's web interface or via:

$ yarn application -kill application_1686796443149_0001

Note that killing Flink might not clean up all job artifacts and temporary files.

启动 sql-client

bin/sql-client.sh -s yarn-session

设置显示模式

SET sql-client.execution.result-mode=tableau;

创建表关联 debezium kafka connect

create table topic_people
(
id bigint,
openid string,
session_key string,
username string,
password string,
icon string,
email string,
nick_name string,
avatar_url string,
gender int,
note string,
create_time string,
login_time string,
status int,
device_id string
) WITH (
'connector' = 'kafka',
'topic' = 'health.dbo.people',
'properties.bootstrap.servers' = 'hadoop103:9092',
'properties.group.id' = 'testPeople',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
'scan.startup.mode' = 'latest-offset',
-- using 'debezium-json' as the format to interpret Debezium JSON messages
-- please use 'debezium-avro-confluent' if Debezium encodes messages in Avro format
'format' = 'debezium-json'
);