Big Data Tips
CentOS
查看 CentOS 版本
cat /etc/redhat-release
修改静态IP
vim /etc/sysconfig/network-scripts/ifcfg-ens33
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.10.102
PREFIX=24
GATEWAY=192.168.10.2
DNS1=192.168.10.2
修改主机名
vim /etc/hostname
hadoop101
配置主机映射
vim /etc/hosts
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
Zookeeper
启动客户端
bin/zkCli.sh -server 192.168.10.101:2181
查看当前znode中所包含的内容
[zk: localhost:2181(CONNECTED) 1] ls /
查看当前节点详细数据
[zk: localhost:2181(CONNECTED) 1] ls -s /
Hadoop
查看内存使用
jps
jmap -heap 2611
手动启动
hdfs --daemon start namenode
hdfs --daemon start datanode
批量关闭进程
kill -9 `jps | grep Sqoop | awk '{print $1}'`
查看 HA 选举
[zk: localhost:2181(CONNECTED) 1] get -s /hadoop-ha/marmot_cluster/ActiveStandbyElectorLock
[zk: localhost:2181(CONNECTED) 1] get -s /yarn-leader-election/cluster-yarn1/ActiveStandbyElectorLock
Atlas
Hive 元数据初次导入
/opt/module/atlas/hook-bin/importhive.sh
Docker
导出导入镜像
保存:
docker save gethue/hue:latest -o hue.latest.tar
加载:
docker load -i hue.latest.tar
Sqoop
导入到 HDFS
sqoop import \
--connect "jdbc:mysql://172.28.252.106:3306/gmall" \
--username root \
--password root \
--table order_info \
--target-dir /origin_data/gmall/db/order_info/2023-04-30 \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
tip
--num-mappers 指定 map task 数量,默认为 4 ,当不指定或者 --num-mappers 大于 1 时,需要指定 --split-by 参数。
导入到 Hive
sqoop import \
--connect "jdbc:mysql://172.28.252.106:3306/gmall" \
--username root \
--password root \
--table order_info \
--num-mappers 1 \
--fields-terminated-by '\t' \
--hive-import \
--hive-overwrite \
--hive-database db_hive1 \
--hive-table ods_order_info \
--hive-partition-key dt \
--hive-partition-value 2023-05-19 \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
Yarn 杀死任务
yarn application -kill application_1698800224241_0753
Hive
创建表
DROP TABLE IF EXISTS ods_order_info;
CREATE EXTERNAL TABLE ods_order_info (
`id` string COMMENT '订单号',
`final_total_amount` decimal(16,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`province_id` string COMMENT '省份ID',
`benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额',
`original_total_amount` decimal(16,2) COMMENT '原价金额',
`feight_fee` decimal(16,2) COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (`dt` string)
-- 按照时间创建分区
ROW format delimited fields terminated BY '\t'
-- 指定分割符为\t
STORED AS
-- 指定存储方式,读数据采用LzoTextInputFormat;输出数据采用TextOutputFormat
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_info/';
DROP TABLE IF EXISTS dwd_order_info;
CREATE EXTERNAL TABLE dwd_order_info (
`id` string COMMENT '订单号',
`final_total_amount` decimal(16,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`province_id` string COMMENT '省份ID',
`benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额',
`original_total_amount` decimal(16,2) COMMENT '原价金额',
`feight_fee` decimal(16,2) COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_order_info/'
tblproperties ("parquet.compression"="lzo");
插入表
INSERT
overwrite table dwd_order_info partition(dt = '2023-03-07')
select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
FROM
ods_order_info;
Kafka
Topics
bin/kafka-topics.sh
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的Kafka Broker 主机名称和端口号。 |
--topic <String: topic> | 操作的topic 名称。 |
--create | 创建主题。 |
--delete | 删除主题。 |
--alter | 修改主题。 |
--list | 查看所有主题。 |
--describe | 查看主题详细描述。 |
--partitions <Integer: # of partitions> | 设置分区数。 |
--replication-factor<Integer: replication factor> | 设置分区副本。 |
--config <String: name=value> | 更新系统默认的配置。 |
查看所有 Topic
bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --list
创建 Topic
bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic test
查看 Topic 详情
bin/kafka-topics.sh --bootstrap-server hadoop101:9092 describe topic test
删除 Topic
bin/kafka-topics.sh --bootstrap-server hadoop101:9092 delete topic test
Producer
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 。 |
--topic <String: topic> | 操作的 Topic 名称 。 |
发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop101:9092 topic test
>hello world
Consumer
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 。 |
--topic <String: topic> | 操作的 Topic 名称 。 |
消费数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic test
Kafka Connect
API
GET /connectors // 返回所有正在运行的connector名。
GET /connector-plugins // 返回支持的connector插件。
POST /connectors // 新建一个connector。
GET /connectors/{name} // 获取指定connetor的信息。
GET /connectors/{name}/config // 获取指定connector的配置信息。
PUT /connectors/{name}/config // 更新指定connector的配置信息。
GET /connectors/{name}/status // 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks // 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status // 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause // 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume // 恢复一个被暂停的connector。
POST /connectors/{name}/restart // 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart // 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} // 删除一个connector,停止它的所有task并删除配置。
示例
创建 Connector
{
"name": "inventory-connector", - 1
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", - 2
"database.hostname": "192.168.99.100", - 3
"database.port": "1433", - 4
"database.user": "sa", - 5
"database.password": "Password!", - 6
"database.dbname": "testDB", - 7
"database.server.name": "fullfillment", - 8
"table.include.list": "dbo.customers", - 9
"database.history.kafka.bootstrap.servers": "kafka:9092", - 10
"database.history.kafka.topic": "dbhistory.fullfillment" - 11
}
}
- The name of our connector when we register it with a Kafka Connect service.
- The name of this SQL Server connector class.
- The address of the SQL Server instance.
- The port number of the SQL Server instance.
- The name of the SQL Server user.
- The password for the SQL Server user.
- The name of the database to capture changes from.
- Topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
- A list of all tables whose changes Debezium should capture.
- The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
- The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.
Sql Server CDC
数据库设置 CDC
开启
EXEC sys.sp_cdc_enable_db GO
查询是否启用
select * from sys.databases where is_cdc_enabled = 1
关闭
exec sys.sp_cdc_disable_db
表设置 CDC
exec sp_cdc_enable_table
@source_schema='dbo',
@source_name='people',
@role_name=null;
确认是否有权限访问 CDC 表
EXEC sys.sp_cdc_help_change_data_capture;
启动SQL Server Agent服务
使用管理员打开 cmd
。
net start SQLSERVERAGENT
确认SQL Server Agent已开启
EXEC sys.sp_cdc_help_change_data_capture
Flink Sql
Sql Client
启动 Flink
bin/yarn-session.sh -d
tip
In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1686796443149_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1686796443149_0001
Note that killing Flink might not clean up all job artifacts and temporary files.
启动 sql-client
bin/sql-client.sh -s yarn-session
设置显示模式
SET sql-client.execution.result-mode=tableau;
创建表关联 debezium kafka connect
create table topic_people
(
id bigint,
openid string,
session_key string,
username string,
password string,
icon string,
email string,
nick_name string,
avatar_url string,
gender int,
note string,
create_time string,
login_time string,
status int,
device_id string
) WITH (
'connector' = 'kafka',
'topic' = 'health.dbo.people',
'properties.bootstrap.servers' = 'hadoop103:9092',
'properties.group.id' = 'testPeople',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
'scan.startup.mode' = 'latest-offset',
-- using 'debezium-json' as the format to interpret Debezium JSON messages
-- please use 'debezium-avro-confluent' if Debezium encodes messages in Avro format
'format' = 'debezium-json'
);