1
1
import logging
2
+ from contextlib import contextmanager
2
3
from enum import Enum
3
4
from pathlib import Path
4
- from typing import List , Optional
5
+ from typing import Generator , List , Optional
5
6
6
7
import yaml
7
8
from pydantic import validator
20
21
logger = logging .getLogger (__name__ )
21
22
22
23
24
+ class StructuredPropertiesConfig :
25
+ """Configuration class to hold the graph client"""
26
+
27
+ _graph : Optional [DataHubGraph ] = None
28
+
29
+ @classmethod
30
+ @contextmanager
31
+ def use_graph (cls , graph : DataHubGraph ) -> Generator [None , None , None ]:
32
+ """Context manager to temporarily set a custom graph"""
33
+ previous_graph = cls ._graph
34
+ cls ._graph = graph
35
+ try :
36
+ yield
37
+ finally :
38
+ cls ._graph = previous_graph
39
+
40
+ @classmethod
41
+ def get_graph (cls ) -> DataHubGraph :
42
+ """Get the current graph, falling back to default if none set"""
43
+ return cls ._graph if cls ._graph is not None else get_default_graph ()
44
+
45
+
23
46
class AllowedTypes (Enum ):
24
47
STRING = "string"
25
48
RICH_TEXT = "rich_text"
@@ -41,25 +64,28 @@ class AllowedValue(ConfigModel):
41
64
description : Optional [str ] = None
42
65
43
66
67
+ VALID_ENTITY_TYPES_PREFIX_STRING = ", " .join (
68
+ [
69
+ f"urn:li:entityType:datahub.{ x } "
70
+ for x in ["dataset" , "dashboard" , "dataFlow" , "schemaField" ]
71
+ ]
72
+ )
73
+ VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are { VALID_ENTITY_TYPES_PREFIX_STRING } , etc... Ensure that the entity type is valid."
74
+
75
+
44
76
class TypeQualifierAllowedTypes (ConfigModel ):
45
77
allowed_types : List [str ]
46
78
47
- @validator ("allowed_types" )
79
+ @validator ("allowed_types" , each_item = True )
48
80
def validate_allowed_types (cls , v ):
49
- validated_entity_type_urns = []
50
81
if v :
51
- with get_default_graph () as graph :
52
- for et in v :
53
- validated_urn = Urn .make_entity_type_urn (et )
54
- if graph .exists (validated_urn ):
55
- validated_entity_type_urns .append (validated_urn )
56
- else :
57
- logger .warn (
58
- f"Input { et } is not a valid entity type urn. Skipping."
59
- )
60
- v = validated_entity_type_urns
61
- if not v :
62
- logger .warn ("No allowed_types given within type_qualifier." )
82
+ graph = StructuredPropertiesConfig .get_graph ()
83
+ validated_urn = Urn .make_entity_type_urn (v )
84
+ if not graph .exists (validated_urn ):
85
+ raise ValueError (
86
+ f"Input { v } is not a valid entity type urn. { VALID_ENTITY_TYPES_STRING } "
87
+ )
88
+ v = str (validated_urn )
63
89
return v
64
90
65
91
@@ -77,6 +103,18 @@ class StructuredProperties(ConfigModel):
77
103
type_qualifier : Optional [TypeQualifierAllowedTypes ] = None
78
104
immutable : Optional [bool ] = False
79
105
106
+ @validator ("entity_types" , each_item = True )
107
+ def validate_entity_types (cls , v ):
108
+ if v :
109
+ graph = StructuredPropertiesConfig .get_graph ()
110
+ validated_urn = Urn .make_entity_type_urn (v )
111
+ if not graph .exists (validated_urn ):
112
+ raise ValueError (
113
+ f"Input { v } is not a valid entity type urn. { VALID_ENTITY_TYPES_STRING } "
114
+ )
115
+ v = str (validated_urn )
116
+ return v
117
+
80
118
@property
81
119
def fqn (self ) -> str :
82
120
assert self .urn is not None
@@ -97,93 +135,99 @@ def urn_must_be_present(cls, v, values):
97
135
@staticmethod
98
136
def create (file : str , graph : Optional [DataHubGraph ] = None ) -> None :
99
137
emitter : DataHubGraph = graph if graph else get_default_graph ()
100
-
101
- with open (file ) as fp :
102
- structuredproperties : List [dict ] = yaml .safe_load (fp )
103
- for structuredproperty_raw in structuredproperties :
104
- structuredproperty = StructuredProperties .parse_obj (
105
- structuredproperty_raw
106
- )
107
- if not structuredproperty .type .islower ():
108
- structuredproperty .type = structuredproperty .type .lower ()
109
- logger .warn (
110
- f"Structured property type should be lowercase. Updated to { structuredproperty .type } "
138
+ with StructuredPropertiesConfig .use_graph (emitter ):
139
+ print ("Using graph" )
140
+ with open (file ) as fp :
141
+ structuredproperties : List [dict ] = yaml .safe_load (fp )
142
+ for structuredproperty_raw in structuredproperties :
143
+ structuredproperty = StructuredProperties .parse_obj (
144
+ structuredproperty_raw
111
145
)
112
- if not AllowedTypes .check_allowed_type (structuredproperty .type ):
113
- raise ValueError (
114
- f"Type { structuredproperty .type } is not allowed. Allowed types are { AllowedTypes .values ()} "
115
- )
116
- mcp = MetadataChangeProposalWrapper (
117
- entityUrn = structuredproperty .urn ,
118
- aspect = StructuredPropertyDefinitionClass (
119
- qualifiedName = structuredproperty .fqn ,
120
- valueType = Urn .make_data_type_urn (structuredproperty .type ),
121
- displayName = structuredproperty .display_name ,
122
- description = structuredproperty .description ,
123
- entityTypes = [
124
- Urn .make_entity_type_urn (entity_type )
125
- for entity_type in structuredproperty .entity_types or []
126
- ],
127
- cardinality = structuredproperty .cardinality ,
128
- immutable = structuredproperty .immutable ,
129
- allowedValues = (
130
- [
131
- PropertyValueClass (
132
- value = v .value , description = v .description
133
- )
134
- for v in structuredproperty .allowed_values
135
- ]
136
- if structuredproperty .allowed_values
137
- else None
138
- ),
139
- typeQualifier = (
140
- {
141
- "allowedTypes" : structuredproperty .type_qualifier .allowed_types
142
- }
143
- if structuredproperty .type_qualifier
144
- else None
146
+ if not structuredproperty .type .islower ():
147
+ structuredproperty .type = structuredproperty .type .lower ()
148
+ logger .warn (
149
+ f"Structured property type should be lowercase. Updated to { structuredproperty .type } "
150
+ )
151
+ if not AllowedTypes .check_allowed_type (structuredproperty .type ):
152
+ raise ValueError (
153
+ f"Type { structuredproperty .type } is not allowed. Allowed types are { AllowedTypes .values ()} "
154
+ )
155
+ mcp = MetadataChangeProposalWrapper (
156
+ entityUrn = structuredproperty .urn ,
157
+ aspect = StructuredPropertyDefinitionClass (
158
+ qualifiedName = structuredproperty .fqn ,
159
+ valueType = Urn .make_data_type_urn (structuredproperty .type ),
160
+ displayName = structuredproperty .display_name ,
161
+ description = structuredproperty .description ,
162
+ entityTypes = [
163
+ Urn .make_entity_type_urn (entity_type )
164
+ for entity_type in structuredproperty .entity_types or []
165
+ ],
166
+ cardinality = structuredproperty .cardinality ,
167
+ immutable = structuredproperty .immutable ,
168
+ allowedValues = (
169
+ [
170
+ PropertyValueClass (
171
+ value = v .value , description = v .description
172
+ )
173
+ for v in structuredproperty .allowed_values
174
+ ]
175
+ if structuredproperty .allowed_values
176
+ else None
177
+ ),
178
+ typeQualifier = (
179
+ {
180
+ "allowedTypes" : structuredproperty .type_qualifier .allowed_types
181
+ }
182
+ if structuredproperty .type_qualifier
183
+ else None
184
+ ),
145
185
),
146
- ),
147
- )
148
- emitter .emit_mcp (mcp )
186
+ )
187
+ emitter .emit_mcp (mcp )
149
188
150
- logger .info (f"Created structured property { structuredproperty .urn } " )
189
+ logger .info (f"Created structured property { structuredproperty .urn } " )
151
190
152
191
@classmethod
153
192
def from_datahub (cls , graph : DataHubGraph , urn : str ) -> "StructuredProperties" :
154
193
155
- structured_property : Optional [
156
- StructuredPropertyDefinitionClass
157
- ] = graph .get_aspect (urn , StructuredPropertyDefinitionClass )
158
- if structured_property is None :
159
- raise Exception (
160
- "StructuredPropertyDefinition aspect is None. Unable to create structured property."
194
+ with StructuredPropertiesConfig .use_graph (graph ):
195
+ structured_property : Optional [
196
+ StructuredPropertyDefinitionClass
197
+ ] = graph .get_aspect (urn , StructuredPropertyDefinitionClass )
198
+ if structured_property is None :
199
+ raise Exception (
200
+ "StructuredPropertyDefinition aspect is None. Unable to create structured property."
201
+ )
202
+ return StructuredProperties (
203
+ urn = urn ,
204
+ qualified_name = structured_property .qualifiedName ,
205
+ display_name = structured_property .displayName ,
206
+ type = structured_property .valueType ,
207
+ description = structured_property .description ,
208
+ entity_types = structured_property .entityTypes ,
209
+ cardinality = structured_property .cardinality ,
210
+ allowed_values = (
211
+ [
212
+ AllowedValue (
213
+ value = av .value ,
214
+ description = av .description ,
215
+ )
216
+ for av in structured_property .allowedValues or []
217
+ ]
218
+ if structured_property .allowedValues is not None
219
+ else None
220
+ ),
221
+ type_qualifier = (
222
+ {
223
+ "allowed_types" : structured_property .typeQualifier .get (
224
+ "allowedTypes"
225
+ )
226
+ }
227
+ if structured_property .typeQualifier
228
+ else None
229
+ ),
161
230
)
162
- return StructuredProperties (
163
- urn = urn ,
164
- qualified_name = structured_property .qualifiedName ,
165
- display_name = structured_property .displayName ,
166
- type = structured_property .valueType ,
167
- description = structured_property .description ,
168
- entity_types = structured_property .entityTypes ,
169
- cardinality = structured_property .cardinality ,
170
- allowed_values = (
171
- [
172
- AllowedValue (
173
- value = av .value ,
174
- description = av .description ,
175
- )
176
- for av in structured_property .allowedValues or []
177
- ]
178
- if structured_property .allowedValues is not None
179
- else None
180
- ),
181
- type_qualifier = (
182
- {"allowed_types" : structured_property .typeQualifier .get ("allowedTypes" )}
183
- if structured_property .typeQualifier
184
- else None
185
- ),
186
- )
187
231
188
232
def to_yaml (
189
233
self ,
0 commit comments